File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import os
0013 import json
0014 import logging
0015
0016 from .common import resolve_common_transfer_errors
0017 from pilot.common.exception import PilotException, ErrorCodes
0018
0019 from pilot.util.container import execute
0020 from pilot.util.ruciopath import get_rucio_path
0021
0022 logger = logging.getLogger(__name__)
0023
0024
0025 require_replicas = False
0026
0027 require_input_protocols = True
0028 require_protocols = True
0029
0030 allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root', 's3', 's3+rucio']
0031
0032
0033 def is_valid_for_copy_in(files):
0034 return True
0035
0036
0037 def is_valid_for_copy_out(files):
0038 return True
0039
0040
0041 def resolve_surl(fspec, protocol, ddmconf, **kwargs):
0042 """
0043 Get final destination SURL for file to be transferred to Objectstore
0044 Can be customized at the level of specific copytool
0045
0046 :param protocol: suggested protocol
0047 :param ddmconf: full ddm storage data
0048 :param fspec: file spec data
0049 :return: dictionary {'surl': surl}
0050 """
0051 ddm = ddmconf.get(fspec.ddmendpoint)
0052 if not ddm:
0053 raise PilotException('Failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
0054
0055 if ddm.is_deterministic:
0056 surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), get_rucio_path(fspec.scope, fspec.lfn))
0057 elif ddm.type in ['OS_ES', 'OS_LOGS']:
0058 surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), fspec.lfn)
0059 fspec.protocol_id = protocol.get('id')
0060 else:
0061 raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: NOT IMPLEMENTED', fspec.ddmendpoint)
0062
0063 return {'surl': surl}
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094 def copy_in(files, **kwargs):
0095 """
0096 Download given files using rucio copytool.
0097
0098 :param files: list of `FileSpec` objects
0099 :raise: PilotException in case of controlled error
0100 """
0101
0102
0103 os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0104
0105 ddmconf = kwargs.pop('ddmconf', {})
0106
0107
0108
0109 for fspec in files:
0110
0111 cmd = []
0112 logger.info("To transfer file: %s", fspec)
0113 if fspec.protocol_id:
0114 ddm = ddmconf.get(fspec.ddmendpoint)
0115 if ddm:
0116 ddm_special_setup = ddm.get_special_setup(fspec.protocol_id)
0117 if ddm_special_setup:
0118 cmd = [ddm_special_setup]
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131 dst = fspec.workdir or kwargs.get('workdir') or '.'
0132 cmd += ['/usr/bin/env', 'rucio', '-v', 'download', '--no-subdir', '--dir', dst]
0133 if require_replicas:
0134 cmd += ['--rse', fspec.replicas[0]['ddmendpoint']]
0135
0136
0137
0138 turl = fspec.turl or fspec.surl
0139 if turl:
0140 if fspec.ddmendpoint:
0141 cmd.extend(['--rse', fspec.ddmendpoint])
0142 cmd.extend(['--pfn', turl])
0143 cmd += ['%s:%s' % (fspec.scope, fspec.lfn)]
0144
0145 rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0146
0147 if rcode:
0148 error = resolve_common_transfer_errors(stderr, is_stagein=True)
0149 fspec.status = 'failed'
0150 fspec.status_code = error.get('rcode')
0151 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0152
0153 fspec.status_code = 0
0154 fspec.status = 'transferred'
0155
0156 return files
0157
0158
0159 def copy_out(files, **kwargs):
0160 """
0161 Upload given files using rucio copytool.
0162
0163 :param files: list of `FileSpec` objects
0164 :raise: PilotException in case of controlled error
0165 """
0166
0167
0168 os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0169
0170 no_register = kwargs.pop('no_register', True)
0171 summary = kwargs.pop('summary', False)
0172 ddmconf = kwargs.pop('ddmconf', {})
0173
0174
0175 for fspec in files:
0176 cmd = []
0177 if fspec.protocol_id:
0178 ddm = ddmconf.get(fspec.ddmendpoint)
0179 if ddm:
0180 ddm_special_setup = ddm.get_special_setup(fspec.protocol_id)
0181 if ddm_special_setup:
0182 cmd = [ddm_special_setup]
0183
0184 cmd += ['/usr/bin/env', 'rucio', '-v', 'upload']
0185 cmd += ['--rse', fspec.ddmendpoint]
0186
0187 if fspec.scope:
0188 cmd.extend(['--scope', fspec.scope])
0189 if fspec.guid:
0190 cmd.extend(['--guid', fspec.guid])
0191
0192 if no_register:
0193 cmd.append('--no-register')
0194
0195 if summary:
0196 cmd.append('--summary')
0197
0198 if fspec.turl:
0199 cmd.extend(['--pfn', fspec.turl])
0200
0201 cmd += [fspec.surl]
0202
0203 rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0204
0205 if rcode:
0206 error = resolve_common_transfer_errors(stderr, is_stagein=False)
0207 fspec.status = 'failed'
0208 fspec.status_code = error.get('rcode')
0209 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0210
0211 if summary:
0212 cwd = fspec.workdir or kwargs.get('workdir') or '.'
0213 path = os.path.join(cwd, 'rucio_upload.json')
0214 if not os.path.exists(path):
0215 logger.error('Failed to resolve Rucio summary JSON, wrong path? file=%s', path)
0216 else:
0217 with open(path, 'rb') as f:
0218 summary = json.load(f)
0219 dat = summary.get("%s:%s" % (fspec.scope, fspec.lfn)) or {}
0220 fspec.turl = dat.get('pfn')
0221
0222
0223 adler32 = dat.get('adler32')
0224 if fspec.checksum.get('adler32') and adler32 and fspec.checksum.get('adler32') != adler32:
0225 raise PilotException("Failed to stageout: CRC mismatched", code=ErrorCodes.PUTADMISMATCH, state='AD_MISMATCH')
0226
0227 fspec.status_code = 0
0228 fspec.status = 'transferred'
0229
0230 return files