Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import gc
0002 import os
0003 import subprocess
0004 import tempfile
0005 import uuid
0006 
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestermover import mover_utils
0009 from pandaharvester.harvesterstager.base_stager import BaseStager
0010 
0011 # logger
0012 _logger = core_utils.setup_logger("xrdcp_stager")
0013 
0014 # stager plugin with https://xrootd.slac.stanford.edu/  xrdcp
0015 """
0016   -- Example of plugin config
0017     "stager": {
0018         "name": "XrdcpStager",
0019         "module": "pandaharvester.harvesterstager.xrdcp_stager",
0020         # base path for destinattion xrdcp server
0021         "dstBasePath": " root://dcgftp.usatlas.bnl.gov:1096//pnfs/usatlas.bnl.gov/BNLT0D1/rucio",
0022         # base path for local access to the copied files
0023         "localBasePath": "/hpcgpfs01/scratch/benjamin/harvester/rucio-data-area",
0024         # max number of attempts
0025         "maxAttempts": 3,
0026         # check paths under localBasePath.
0027         "checkLocalPath": true,
0028         # options for xrdcp
0029         "xrdcpOpts": "--retry 3 --cksum adler32 --debug 1"
0030     }
0031 """
0032 
0033 
0034 # dummy plugin for stager
0035 class XrdcpStager(BaseStager):
0036     # constructor
0037     def __init__(self, **kwarg):
0038         BaseStager.__init__(self, **kwarg)
0039         if not hasattr(self, "xrdcpOpts"):
0040             self.xrdcpOpts = None
0041         if not hasattr(self, "maxAttempts"):
0042             self.maxAttempts = 3
0043         if not hasattr(self, "timeout"):
0044             self.timeout = None
0045         if not hasattr(self, "checkLocalPath"):
0046             self.checkLocalPath = True
0047 
0048     # check status
0049     def check_stage_out_status(self, jobspec):
0050         """Check the status of stage-out procedure. If staging-out is done synchronously in trigger_stage_out
0051         this method should always return True.
0052         Output files are available through jobspec.get_outfile_specs(skip_done=False) which gives
0053         a list of FileSpecs not yet done.
0054         FileSpec.attemptNr shows how many times the transfer was checked for the file.
0055         If the file was successfully transferred, status should be set to 'finished'.
0056         Or 'failed', if the file failed to be transferred. Once files are set to 'finished' or 'failed',
0057         jobspec.get_outfile_specs(skip_done=False) ignores them.
0058 
0059         :param jobspec: job specifications
0060         :type jobspec: JobSpec
0061         :return: A tuple of return code (True: transfer success, False: fatal transfer failure,
0062                  None: on-going or temporary failure) and error dialog
0063         :rtype: (bool, string)
0064         """
0065         for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0066             fileSpec.status = "finished"
0067         return True, ""
0068 
0069     # trigger stage out
0070     def trigger_stage_out(self, jobspec):
0071         """Trigger the stage-out procedure for the job.
0072         Output files are available through jobspec.get_outfile_specs(skip_done=False) which gives
0073         a list of FileSpecs not yet done.
0074         FileSpec.attemptNr shows how many times transfer was tried for the file so far.
0075 
0076         :param jobspec: job specifications
0077         :type jobspec: JobSpec
0078         :return: A tuple of return code (True: success, False: fatal failure, None: temporary failure)
0079                  and error dialog
0080         :rtype: (bool, string)
0081         """
0082 
0083         # let gc clean up memory
0084         gc.collect()
0085 
0086         # make logger
0087         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0088         tmpLog.debug("start")
0089         # get the environment
0090         harvester_env = os.environ.copy()
0091         # tmpLog.debug('Harvester environment : {}'.format(harvester_env))
0092 
0093         xrdcpOutput = None
0094         allfiles_transfered = True
0095         overall_errMsg = ""
0096         fileAttrs = jobspec.get_output_file_attributes()
0097         # loop over all output files
0098         for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0099             # fileSpec.objstoreID = 123
0100             # fileSpec.fileAttributes['guid']
0101             # construct source and destination paths
0102             dstPath = mover_utils.construct_file_path(self.dstBasePath, fileAttrs[fileSpec.lfn]["scope"], fileSpec.lfn)
0103             # local path
0104             localPath = mover_utils.construct_file_path(self.localBasePath, fileAttrs[fileSpec.lfn]["scope"], fileSpec.lfn)
0105             tmpLog.debug(f"fileSpec.path - {fileSpec.path} fileSpec.lfn = {fileSpec.lfn}")
0106             localPath = fileSpec.path
0107             if self.checkLocalPath:
0108                 # check if already exits
0109                 if os.path.exists(localPath):
0110                     # calculate checksum
0111                     checksum = core_utils.calc_adler32(localPath)
0112                     checksum = f"ad:{checksum}"
0113                     if checksum == fileAttrs[fileSpec.lfn]["checksum"]:
0114                         continue
0115             # collect list of output files
0116             if xrdcpOutput is None:
0117                 xrdcpOutput = [dstPath]
0118             else:
0119                 if dstPath not in xrdcpOutput:
0120                     xrdcpOutput.append(dstPath)
0121             # transfer using xrdcp one file at a time
0122             tmpLog.debug("execute xrdcp")
0123             args = ["xrdcp", "--nopbar", "--force"]
0124             args_files = [localPath, dstPath]
0125             if self.xrdcpOpts is not None:
0126                 args += self.xrdcpOpts.split()
0127             args += args_files
0128             fileSpec.attemptNr += 1
0129             try:
0130                 xrdcp_cmd = " ".join(args)
0131                 tmpLog.debug(f"execute: {xrdcp_cmd}")
0132                 process = subprocess.Popen(xrdcp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=harvester_env, shell=True)
0133                 try:
0134                     stdout, stderr = process.communicate(timeout=self.timeout)
0135                 except subprocess.TimeoutExpired:
0136                     process.kill()
0137                     stdout, stderr = process.communicate()
0138                     tmpLog.warning("command timeout")
0139                 return_code = process.returncode
0140                 if stdout is not None:
0141                     if not isinstance(stdout, str):
0142                         stdout = stdout.decode()
0143                     stdout = stdout.replace("\n", " ")
0144                 if stderr is not None:
0145                     if not isinstance(stderr, str):
0146                         stderr = stderr.decode()
0147                     stderr = stderr.replace("\n", " ")
0148                 tmpLog.debug(f"stdout: {stdout}")
0149                 tmpLog.debug(f"stderr: {stderr}")
0150             except Exception:
0151                 core_utils.dump_error_message(tmpLog)
0152                 return_code = 1
0153             if return_code == 0:
0154                 fileSpec.status = "finished"
0155             else:
0156                 overall_errMsg += f"file - {localPath} did not transfer error code {return_code} "
0157                 allfiles_transfered = False
0158                 errMsg = f"failed with {return_code}"
0159                 tmpLog.error(errMsg)
0160                 # check attemptNr
0161                 if fileSpec.attemptNr >= self.maxAttempts:
0162                     tmpLog.error(f"reached maxattempts: {self.maxAttempts}, marked it as failed")
0163                     fileSpec.status = "failed"
0164 
0165             # force update
0166             fileSpec.force_update("status")
0167             tmpLog.debug(f"file: {fileSpec.lfn} status: {fileSpec.status}")
0168             del process, stdout, stderr
0169 
0170         # end loop over output files
0171 
0172         # nothing to transfer
0173         if xrdcpOutput is None:
0174             tmpLog.debug("done with no transfers")
0175             return True, ""
0176         # check if all files were transfered
0177         tmpLog.debug("done")
0178         if allfiles_transfered:
0179             return True, ""
0180         else:
0181             return None, overall_errMsg
0182 
0183     # zip output files
0184 
0185     def zip_output(self, jobspec):
0186         """OBSOLETE : zip functions should be implemented in zipper plugins.
0187         Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs,
0188         to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of
0189         associated files to be zipped. The path of each associated file is available in associated
0190         file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and
0191         FileSpec.chksum need to be set.
0192 
0193         :param jobspec: job specifications
0194         :type jobspec: JobSpec
0195         :return: A tuple of return code (True for success, False otherwise) and error dialog
0196         :rtype: (bool, string)
0197         """
0198         # make logger
0199         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0200         return self.simple_zip_output(jobspec, tmpLog)
0201 
0202     # asynchronous zip output
0203     def async_zip_output(self, jobspec):
0204         """OBSOLETE : zip functions should be implemented in zipper plugins.
0205         Zip output files asynchronously. This method is followed by post_zip_output(),
0206         which is typically useful to trigger an asynchronous zipping mechanism such as batch job.
0207         This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make
0208         a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs
0209         of associated files to be zipped. The path of each associated file is available in associated
0210         file's FileSpec.path.
0211 
0212         :param jobspec: job specifications
0213         :type jobspec: JobSpec
0214         :return: A tuple of return code (True for success, False otherwise) and error dialog
0215         :rtype: (bool, string)
0216         """
0217         # make logger
0218         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0219         # set some ID which can be used for lookup in post_zip_output()
0220         groupID = str(uuid.uuid4())
0221         lfns = []
0222         for fileSpec in jobspec.outFiles:
0223             lfns.append(fileSpec.lfn)
0224         jobspec.set_groups_to_files({groupID: {"lfns": lfns, "groupStatus": "zipping"}})
0225         return True, ""
0226 
0227     # post zipping
0228     def post_zip_output(self, jobspec):
0229         """OBSOLETE : zip functions should be implemented in zipper plugins.
0230         This method is executed after async_zip_output(), to do post-processing for zipping.
0231         Once zip files are made, this method needs to look over jobspec.outFiles to set their
0232         FileSpec.path, FileSpec.fsize, and FileSpec.chksum.
0233 
0234         :param jobspec: job specifications
0235         :type jobspec: JobSpec
0236         :return: A tuple of return code (True for success, False otherwise) and error dialog
0237         :rtype: (bool, string)
0238         """
0239         # make logger
0240         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0241         # get groups for lookup
0242         groups = jobspec.get_groups_of_output_files()
0243         # do something with groupIDs
0244         pass
0245         # update file attributes
0246         for fileSpec in jobspec.outFiles:
0247             fileSpec.path = "/path/to/zip"
0248             fileSpec.fsize = 12345
0249             fileSpec.chksum = "66bb0985"
0250         return True, ""