File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import os
0013 import re
0014
0015 from pilot.common.exception import StageInFailure, StageOutFailure, ErrorCodes, PilotException
0016 from pilot.util.container import execute
0017
0018 import logging
0019 logger = logging.getLogger(__name__)
0020
0021 require_replicas = False
0022 check_availablespace = False
0023
0024
0025 def create_output_list(files, init_dir, ddmconf):
0026 """
0027 Add files to the output list which tells ARC CE which files to upload
0028 """
0029
0030 if not ddmconf:
0031 raise PilotException("copy_out() failed to resolve ddmconf from function arguments",
0032 code=ErrorCodes.STAGEOUTFAILED,
0033 state='COPY_ERROR')
0034
0035 for fspec in files:
0036 arcturl = fspec.turl
0037 if arcturl.startswith('s3://'):
0038
0039 arcturl = re.sub(r'^s3', 's3+rucio', arcturl)
0040
0041 rucio = 'rucio://rucio-lb-prod.cern.ch;failureallowed=yes/objectstores'
0042 rse = fspec.ddmendpoint
0043 activity = 'write'
0044 arcturl = '/'.join([rucio, arcturl, rse, activity])
0045 else:
0046
0047 checksumtype, checksum = list(fspec.checksum.items())[0]
0048
0049 token = ddmconf.get(fspec.ddmendpoint).token
0050 if not token:
0051 logger.info('No space token info for %s', fspec.ddmendpoint)
0052 else:
0053 arcturl = re.sub(r'((:\d+)/)', r'\2;autodir=no;spacetoken=%s/' % token, arcturl)
0054 arcturl += ':checksumtype=%s:checksumvalue=%s' % (checksumtype, checksum)
0055
0056 logger.info('Adding to output.list: %s %s', fspec.lfn, arcturl)
0057
0058 with open(os.path.join(init_dir, 'output.list'), 'a') as f:
0059 f.write('%s %s\n' % (fspec.lfn, arcturl))
0060
0061
0062 def is_valid_for_copy_in(files):
0063 return True
0064
0065
0066
0067
0068
0069
0070 def is_valid_for_copy_out(files):
0071 return True
0072
0073
0074
0075
0076
0077
0078 def copy_in(files, copy_type="symlink", **kwargs):
0079 """
0080 Tries to download the given files using mv directly.
0081
0082 :param files: list of `FileSpec` objects
0083 :raises PilotException: StageInFailure
0084 """
0085
0086
0087 allow_direct_access = kwargs.get('allow_direct_access')
0088 for fspec in files:
0089 if fspec.is_directaccess(ensure_replica=False) and allow_direct_access and fspec.accessmode == 'direct':
0090 fspec.status_code = ErrorCodes.BADQUEUECONFIGURATION
0091 raise StageInFailure("bad queue configuration - mv does not support direct access")
0092
0093 if copy_type not in ["cp", "mv", "symlink"]:
0094 raise StageInFailure("incorrect method for copy in")
0095
0096 if not kwargs.get('workdir'):
0097 raise StageInFailure("workdir is not specified")
0098
0099 exit_code, stdout, stderr = move_all_files(files, copy_type, kwargs.get('workdir'))
0100 if exit_code != 0:
0101
0102 raise StageInFailure(stdout)
0103
0104 return files
0105
0106
0107 def copy_out(files, copy_type="mv", **kwargs):
0108 """
0109 Tries to upload the given files using mv directly.
0110
0111 :param files: list of `FileSpec` objects
0112 :raises PilotException: StageOutFailure
0113 """
0114
0115 if copy_type not in ["cp", "mv"]:
0116 raise StageOutFailure("incorrect method for copy out")
0117
0118 if not kwargs.get('workdir'):
0119 raise StageOutFailure("Workdir is not specified")
0120
0121 exit_code, stdout, stderr = move_all_files(files, copy_type, kwargs.get('workdir'))
0122 if exit_code != 0:
0123
0124 raise StageOutFailure(stdout)
0125
0126
0127 logger.debug('init_dir for output.list=%s', os.path.dirname(kwargs.get('workdir')))
0128 output_dir = kwargs.get('output_dir', '')
0129 if not output_dir:
0130 create_output_list(files, os.path.dirname(kwargs.get('workdir')), kwargs.get('ddmconf', None))
0131
0132 return files
0133
0134
0135 def move_all_files(files, copy_type, workdir):
0136 """
0137 Move all files.
0138
0139 :param files: list of `FileSpec` objects
0140 :return: exit_code, stdout, stderr
0141 """
0142
0143 exit_code = 0
0144 stdout = ""
0145 stderr = ""
0146
0147
0148 if copy_type == "mv":
0149 copy_method = move
0150 elif copy_type == "cp":
0151 copy_method = copy
0152 elif copy_type == "symlink":
0153 copy_method = symlink
0154 else:
0155 return -1, "", "incorrect copy method"
0156
0157 for fspec in files:
0158
0159 name = fspec.lfn
0160 if fspec.filetype == 'input':
0161
0162 source = os.path.join(os.path.dirname(workdir), name)
0163 destination = os.path.join(workdir, name)
0164 else:
0165 source = os.path.join(workdir, name)
0166 destination = os.path.join(os.path.dirname(workdir), name)
0167
0168
0169 source = os.path.realpath(source)
0170
0171 logger.info("transferring file %s from %s to %s", name, source, destination)
0172
0173 exit_code, stdout, stderr = copy_method(source, destination)
0174 if exit_code != 0:
0175 logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0176 fspec.status = 'failed'
0177 if fspec.filetype == 'input':
0178 fspec.status_code = ErrorCodes.STAGEINFAILED
0179 else:
0180 fspec.status_code = ErrorCodes.STAGEOUTFAILED
0181 break
0182 else:
0183 fspec.status_code = 0
0184 fspec.status = 'transferred'
0185
0186 return exit_code, stdout, stderr
0187
0188
0189 def move(source, destination):
0190 """
0191 Tries to upload the given files using mv directly.
0192
0193 :param source:
0194 :param destination:
0195
0196 :return: exit_code, stdout, stderr
0197 """
0198
0199 executable = ['/usr/bin/env', 'mv', source, destination]
0200 cmd = ' '.join(executable)
0201 exit_code, stdout, stderr = execute(cmd)
0202
0203 return exit_code, stdout, stderr
0204
0205
0206 def copy(source, destination):
0207 """
0208 Tries to upload the given files using xrdcp directly.
0209
0210 :param source:
0211 :param destination:
0212
0213 :return: exit_code, stdout, stderr
0214 """
0215
0216 executable = ['/usr/bin/env', 'cp', source, destination]
0217 cmd = ' '.join(executable)
0218 exit_code, stdout, stderr = execute(cmd)
0219
0220 return exit_code, stdout, stderr
0221
0222
0223 def symlink(source, destination):
0224 """
0225 Tries to ln the given files.
0226
0227 :param source:
0228 :param destination:
0229
0230 :return: exit_code, stdout, stderr
0231 """
0232
0233 executable = ['/usr/bin/env', 'ln', '-s', source, destination]
0234 cmd = ' '.join(executable)
0235 exit_code, stdout, stderr = execute(cmd)
0236
0237 return exit_code, stdout, stderr