Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0009 # - Tobias Wegner, tobias.wegner@cern.ch, 2018
0010 # - David Cameron, david.cameron@cern.ch, 2018-2019
0011 
0012 import os
0013 import re
0014 
0015 from pilot.common.exception import StageInFailure, StageOutFailure, ErrorCodes, PilotException
0016 from pilot.util.container import execute
0017 
0018 import logging
0019 logger = logging.getLogger(__name__)
0020 
0021 require_replicas = False  # indicate if given copytool requires input replicas to be resolved
0022 check_availablespace = False  # indicate whether space check should be applied before stage-in transfers using given copytool
0023 
0024 
0025 def create_output_list(files, init_dir, ddmconf):
0026     """
0027     Add files to the output list which tells ARC CE which files to upload
0028     """
0029 
0030     if not ddmconf:
0031         raise PilotException("copy_out() failed to resolve ddmconf from function arguments",
0032                              code=ErrorCodes.STAGEOUTFAILED,
0033                              state='COPY_ERROR')
0034 
0035     for fspec in files:
0036         arcturl = fspec.turl
0037         if arcturl.startswith('s3://'):
0038             # Use Rucio proxy to upload to OS
0039             arcturl = re.sub(r'^s3', 's3+rucio', arcturl)
0040             # Add failureallowed option so failed upload does not fail job
0041             rucio = 'rucio://rucio-lb-prod.cern.ch;failureallowed=yes/objectstores'
0042             rse = fspec.ddmendpoint
0043             activity = 'write'
0044             arcturl = '/'.join([rucio, arcturl, rse, activity])
0045         else:
0046             # Add ARC options to TURL
0047             checksumtype, checksum = list(fspec.checksum.items())[0]  # Python 2/3
0048             # resolve token value from fspec.ddmendpoint
0049             token = ddmconf.get(fspec.ddmendpoint).token
0050             if not token:
0051                 logger.info('No space token info for %s', fspec.ddmendpoint)
0052             else:
0053                 arcturl = re.sub(r'((:\d+)/)', r'\2;autodir=no;spacetoken=%s/' % token, arcturl)
0054             arcturl += ':checksumtype=%s:checksumvalue=%s' % (checksumtype, checksum)
0055 
0056         logger.info('Adding to output.list: %s %s', fspec.lfn, arcturl)
0057         # Write output.list
0058         with open(os.path.join(init_dir, 'output.list'), 'a') as f:
0059             f.write('%s %s\n' % (fspec.lfn, arcturl))
0060 
0061 
0062 def is_valid_for_copy_in(files):
0063     return True  # FIX ME LATER
0064     #for f in files:
0065     #    if not all(key in f for key in ('name', 'source', 'destination')):
0066     #        return False
0067     #return True
0068 
0069 
0070 def is_valid_for_copy_out(files):
0071     return True  # FIX ME LATER
0072     #for f in files:
0073     #    if not all(key in f for key in ('name', 'source', 'destination')):
0074     #        return False
0075     #return True
0076 
0077 
0078 def copy_in(files, copy_type="symlink", **kwargs):
0079     """
0080     Tries to download the given files using mv directly.
0081 
0082     :param files: list of `FileSpec` objects
0083     :raises PilotException: StageInFailure
0084     """
0085 
0086     # make sure direct access is not attempted (wrong queue configuration - pilot should fail job)
0087     allow_direct_access = kwargs.get('allow_direct_access')
0088     for fspec in files:
0089         if fspec.is_directaccess(ensure_replica=False) and allow_direct_access and fspec.accessmode == 'direct':
0090             fspec.status_code = ErrorCodes.BADQUEUECONFIGURATION
0091             raise StageInFailure("bad queue configuration - mv does not support direct access")
0092 
0093     if copy_type not in ["cp", "mv", "symlink"]:
0094         raise StageInFailure("incorrect method for copy in")
0095 
0096     if not kwargs.get('workdir'):
0097         raise StageInFailure("workdir is not specified")
0098 
0099     exit_code, stdout, stderr = move_all_files(files, copy_type, kwargs.get('workdir'))
0100     if exit_code != 0:
0101         # raise failure
0102         raise StageInFailure(stdout)
0103 
0104     return files
0105 
0106 
0107 def copy_out(files, copy_type="mv", **kwargs):
0108     """
0109     Tries to upload the given files using mv directly.
0110 
0111     :param files: list of `FileSpec` objects
0112     :raises PilotException: StageOutFailure
0113     """
0114 
0115     if copy_type not in ["cp", "mv"]:
0116         raise StageOutFailure("incorrect method for copy out")
0117 
0118     if not kwargs.get('workdir'):
0119         raise StageOutFailure("Workdir is not specified")
0120 
0121     exit_code, stdout, stderr = move_all_files(files, copy_type, kwargs.get('workdir'))
0122     if exit_code != 0:
0123         # raise failure
0124         raise StageOutFailure(stdout)
0125 
0126     # Create output list for ARC CE if necessary
0127     logger.debug('init_dir for output.list=%s', os.path.dirname(kwargs.get('workdir')))
0128     output_dir = kwargs.get('output_dir', '')
0129     if not output_dir:
0130         create_output_list(files, os.path.dirname(kwargs.get('workdir')), kwargs.get('ddmconf', None))
0131 
0132     return files
0133 
0134 
0135 def move_all_files(files, copy_type, workdir):
0136     """
0137     Move all files.
0138 
0139     :param files: list of `FileSpec` objects
0140     :return: exit_code, stdout, stderr
0141     """
0142 
0143     exit_code = 0
0144     stdout = ""
0145     stderr = ""
0146     # copy_method = None
0147 
0148     if copy_type == "mv":
0149         copy_method = move
0150     elif copy_type == "cp":
0151         copy_method = copy
0152     elif copy_type == "symlink":
0153         copy_method = symlink
0154     else:
0155         return -1, "", "incorrect copy method"
0156 
0157     for fspec in files:  # entry = {'name':<filename>, 'source':<dir>, 'destination':<dir>}
0158 
0159         name = fspec.lfn
0160         if fspec.filetype == 'input':
0161             # Assumes pilot runs in subdir one level down from working dir
0162             source = os.path.join(os.path.dirname(workdir), name)
0163             destination = os.path.join(workdir, name)
0164         else:
0165             source = os.path.join(workdir, name)
0166             destination = os.path.join(os.path.dirname(workdir), name)
0167 
0168         # resolve canonical path
0169         source = os.path.realpath(source)
0170 
0171         logger.info("transferring file %s from %s to %s", name, source, destination)
0172 
0173         exit_code, stdout, stderr = copy_method(source, destination)
0174         if exit_code != 0:
0175             logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0176             fspec.status = 'failed'
0177             if fspec.filetype == 'input':
0178                 fspec.status_code = ErrorCodes.STAGEINFAILED
0179             else:
0180                 fspec.status_code = ErrorCodes.STAGEOUTFAILED
0181             break
0182         else:
0183             fspec.status_code = 0
0184             fspec.status = 'transferred'
0185 
0186     return exit_code, stdout, stderr
0187 
0188 
0189 def move(source, destination):
0190     """
0191     Tries to upload the given files using mv directly.
0192 
0193     :param source:
0194     :param destination:
0195 
0196     :return: exit_code, stdout, stderr
0197     """
0198 
0199     executable = ['/usr/bin/env', 'mv', source, destination]
0200     cmd = ' '.join(executable)
0201     exit_code, stdout, stderr = execute(cmd)
0202 
0203     return exit_code, stdout, stderr
0204 
0205 
0206 def copy(source, destination):
0207     """
0208     Tries to upload the given files using xrdcp directly.
0209 
0210     :param source:
0211     :param destination:
0212 
0213     :return: exit_code, stdout, stderr
0214     """
0215 
0216     executable = ['/usr/bin/env', 'cp', source, destination]
0217     cmd = ' '.join(executable)
0218     exit_code, stdout, stderr = execute(cmd)
0219 
0220     return exit_code, stdout, stderr
0221 
0222 
0223 def symlink(source, destination):
0224     """
0225     Tries to ln the given files.
0226 
0227     :param source:
0228     :param destination:
0229 
0230     :return: exit_code, stdout, stderr
0231     """
0232 
0233     executable = ['/usr/bin/env', 'ln', '-s', source, destination]
0234     cmd = ' '.join(executable)
0235     exit_code, stdout, stderr = execute(cmd)
0236 
0237     return exit_code, stdout, stderr