File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import os
0012 import logging
0013 from pilot.info import infosys
0014 import subprocess
0015 import re
0016
0017 try:
0018 from google.cloud import storage
0019 except Exception:
0020 pass
0021 try:
0022 import pathlib
0023 except Exception:
0024 pathlib = None
0025
0026 from .common import resolve_common_transfer_errors
0027 from pilot.common.errorcodes import ErrorCodes
0028 from pilot.common.exception import PilotException
0029 from pilot.util.config import config
0030
0031 logger = logging.getLogger(__name__)
0032 errors = ErrorCodes()
0033
0034 require_replicas = False
0035 require_input_protocols = True
0036 require_protocols = True
0037
0038 allowed_schemas = ['gs', 'srm', 'gsiftp', 'https', 'davs', 'root']
0039
0040
0041 def is_valid_for_copy_in(files):
0042 return True
0043
0044
0045 def is_valid_for_copy_out(files):
0046 return True
0047
0048
0049 def resolve_surl(fspec, protocol, ddmconf, **kwargs):
0050 """
0051 Get final destination SURL for file to be transferred to Objectstore
0052 Can be customized at the level of specific copytool
0053
0054 :param protocol: suggested protocol
0055 :param ddmconf: full ddm storage data
0056 :param fspec: file spec data
0057 :return: dictionary {'surl': surl}
0058 """
0059
0060 try:
0061 pandaqueue = infosys.pandaqueue
0062 except Exception:
0063 pandaqueue = ""
0064 if pandaqueue is None:
0065 pandaqueue = ""
0066
0067 ddm = ddmconf.get(fspec.ddmendpoint)
0068 if not ddm:
0069 raise PilotException('failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
0070
0071 dataset = fspec.dataset
0072 if dataset:
0073 dataset = dataset.replace("#{pandaid}", os.environ['PANDAID'])
0074 else:
0075 dataset = ""
0076
0077 remote_path = os.path.join(protocol.get('path', ''), pandaqueue, dataset)
0078 surl = protocol.get('endpoint', '') + remote_path
0079 logger.info('For GCS bucket, set surl=%s', surl)
0080
0081
0082
0083
0084 return {'surl': surl}
0085
0086
0087 def copy_in(files, **kwargs):
0088 """
0089 Download given files from a GCS bucket.
0090
0091 :param files: list of `FileSpec` objects
0092 :raise: PilotException in case of controlled error
0093 """
0094
0095 for fspec in files:
0096
0097 dst = fspec.workdir or kwargs.get('workdir') or '.'
0098 path = os.path.join(dst, fspec.lfn)
0099 logger.info('downloading surl=%s to local file %s', fspec.surl, path)
0100 status, diagnostics = download_file(path, fspec.surl, object_name=fspec.lfn)
0101
0102 if not status:
0103 error = resolve_common_transfer_errors(diagnostics, is_stagein=True)
0104 fspec.status = 'failed'
0105 fspec.status_code = error.get('rcode')
0106 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0107
0108 fspec.status_code = 0
0109 fspec.status = 'transferred'
0110
0111 return files
0112
0113
0114 def download_file(path, surl, object_name=None):
0115 """
0116 Download a file from a GS bucket.
0117
0118 :param path: Path to local file after download (string).
0119 :param surl: remote path (string).
0120 :param object_name: GCS object name. If not specified then file_name from path is used.
0121 :return: True if file was uploaded (else False), diagnostics (string).
0122 """
0123
0124
0125 if object_name is None:
0126 object_name = os.path.basename(path)
0127
0128 try:
0129 client = storage.Client()
0130 target = pathlib.Path(object_name)
0131 with target.open(mode="wb") as downloaded_file:
0132 client.download_blob_to_file(surl, downloaded_file)
0133 except Exception as error:
0134 diagnostics = 'exception caught in gs client: %s' % error
0135 logger.critical(diagnostics)
0136 return False, diagnostics
0137
0138 return True, ""
0139
0140
0141 def copy_out(files, **kwargs):
0142 """
0143 Upload given files to GS storage.
0144
0145 :param files: list of `FileSpec` objects
0146 :raise: PilotException in case of controlled error
0147 """
0148
0149 workdir = kwargs.pop('workdir')
0150
0151 if len(files) > 0:
0152 fspec = files[0]
0153
0154 reobj = re.match(r'gs://([^/]*)/(.*)', fspec.turl)
0155 (bucket, remote_path) = reobj.groups()
0156
0157 for fspec in files:
0158 logger.info('Going to process fspec.turl=%s', fspec.turl)
0159
0160 logfiles = []
0161 lfn = fspec.lfn.strip(' ')
0162 dataset = fspec.dataset
0163 if lfn == '/' or dataset.endswith('/'):
0164
0165 logfiles = os.listdir(workdir)
0166 else:
0167 logfiles = [lfn]
0168
0169 for logfile in logfiles:
0170 path = os.path.join(workdir, logfile)
0171 if os.path.exists(path):
0172 if logfile == config.Pilot.pilotlog or logfile == config.Payload.payloadstdout or logfile == config.Payload.payloadstderr:
0173 content_type = "text/plain"
0174 logger.info('Change the file=%s content-type to text/plain', logfile)
0175 else:
0176 content_type = None
0177 try:
0178 result = subprocess.check_output(["/bin/file", "-i", "-b", "-L", path])
0179 if not isinstance(result, str):
0180 result = result.decode('utf-8')
0181 if result.find(';') > 0:
0182 content_type = result.split(';')[0]
0183 logger.info('Change the file=%s content-type to %s', logfile, content_type)
0184 except Exception:
0185 pass
0186
0187 object_name = os.path.join(remote_path, logfile)
0188 logger.info('uploading %s to bucket=%s using object name=%s', path, bucket, object_name)
0189 status, diagnostics = upload_file(path, bucket, object_name=object_name, content_type=content_type)
0190
0191 if not status:
0192
0193 error = resolve_common_transfer_errors(diagnostics, is_stagein=False)
0194 fspec.status = 'failed'
0195 fspec.status_code = error.get('rcode')
0196 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0197 else:
0198 diagnostics = 'local output file does not exist: %s' % path
0199 logger.warning(diagnostics)
0200 fspec.status = 'failed'
0201 fspec.status_code = errors.STAGEOUTFAILED
0202 raise PilotException(diagnostics, code=fspec.status_code, state=fspec.status)
0203
0204 fspec.status = 'transferred'
0205 fspec.status_code = 0
0206
0207 return files
0208
0209
0210 def upload_file(file_name, bucket, object_name=None, content_type=None):
0211 """
0212 Upload a file to a GCS bucket.
0213
0214 :param file_name: File to upload.
0215 :param bucket: Bucket to upload to (string).
0216 :param object_name: GCS object name. If not specified then file_name is used.
0217 :return: True if file was uploaded (else False), diagnostics (string).
0218 """
0219
0220
0221 if object_name is None:
0222 object_name = file_name
0223
0224
0225 try:
0226 client = storage.Client()
0227 gs_bucket = client.get_bucket(bucket)
0228
0229 object_name = object_name.lstrip('/')
0230 logger.info('uploading a file to bucket=%s in full path=%s in content_type=%s', bucket, object_name, content_type)
0231 blob = gs_bucket.blob(object_name)
0232 blob.upload_from_filename(filename=file_name, content_type=content_type)
0233 if file_name.endswith(config.Pilot.pilotlog):
0234 url_pilotlog = blob.public_url
0235 os.environ['GTAG'] = url_pilotlog
0236 logger.debug("Set envvar GTAG with the pilotLot URL=%s", url_pilotlog)
0237 except Exception as error:
0238 diagnostics = 'exception caught in gs client: %s' % error
0239 logger.critical(diagnostics)
0240 return False, diagnostics
0241
0242 return True, ""