Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2021
0009 # - Shuwei
0010 
0011 import os
0012 import logging
0013 from pilot.info import infosys
0014 import subprocess
0015 import re
0016 
0017 try:
0018     from google.cloud import storage
0019 except Exception:
0020     pass
0021 try:
0022     import pathlib  # Python 3
0023 except Exception:
0024     pathlib = None
0025 
0026 from .common import resolve_common_transfer_errors
0027 from pilot.common.errorcodes import ErrorCodes
0028 from pilot.common.exception import PilotException
0029 from pilot.util.config import config
0030 
0031 logger = logging.getLogger(__name__)
0032 errors = ErrorCodes()
0033 
0034 require_replicas = False    ## indicates if given copytool requires input replicas to be resolved
0035 require_input_protocols = True    ## indicates if given copytool requires input protocols and manual generation of input replicas
0036 require_protocols = True  ## indicates if given copytool requires protocols to be resolved first for stage-out
0037 
0038 allowed_schemas = ['gs', 'srm', 'gsiftp', 'https', 'davs', 'root']
0039 
0040 
0041 def is_valid_for_copy_in(files):
0042     return True  ## FIX ME LATER
0043 
0044 
0045 def is_valid_for_copy_out(files):
0046     return True  ## FIX ME LATER
0047 
0048 
0049 def resolve_surl(fspec, protocol, ddmconf, **kwargs):
0050     """
0051         Get final destination SURL for file to be transferred to Objectstore
0052         Can be customized at the level of specific copytool
0053 
0054         :param protocol: suggested protocol
0055         :param ddmconf: full ddm storage data
0056         :param fspec: file spec data
0057         :return: dictionary {'surl': surl}
0058     """
0059 
0060     try:
0061         pandaqueue = infosys.pandaqueue
0062     except Exception:
0063         pandaqueue = ""
0064     if pandaqueue is None:
0065         pandaqueue = ""
0066 
0067     ddm = ddmconf.get(fspec.ddmendpoint)
0068     if not ddm:
0069         raise PilotException('failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
0070 
0071     dataset = fspec.dataset
0072     if dataset:
0073         dataset = dataset.replace("#{pandaid}", os.environ['PANDAID'])
0074     else:
0075         dataset = ""
0076 
0077     remote_path = os.path.join(protocol.get('path', ''), pandaqueue, dataset)
0078     surl = protocol.get('endpoint', '') + remote_path
0079     logger.info('For GCS bucket, set surl=%s', surl)
0080 
0081     # example:
0082     #   protocol = {u'path': u'/atlas-eventservice', u'endpoint': u's3://s3.cern.ch:443/', u'flavour': u'AWS-S3-SSL', u'id': 175}
0083     #   surl = 's3://s3.cern.ch:443//atlas-eventservice/EventService_premerge_24706191-5013009653-24039149400-322-5.tar'
0084     return {'surl': surl}
0085 
0086 
0087 def copy_in(files, **kwargs):
0088     """
0089     Download given files from a GCS bucket.
0090 
0091     :param files: list of `FileSpec` objects
0092     :raise: PilotException in case of controlled error
0093     """
0094 
0095     for fspec in files:
0096 
0097         dst = fspec.workdir or kwargs.get('workdir') or '.'
0098         path = os.path.join(dst, fspec.lfn)
0099         logger.info('downloading surl=%s to local file %s', fspec.surl, path)
0100         status, diagnostics = download_file(path, fspec.surl, object_name=fspec.lfn)
0101 
0102         if not status:  ## an error occurred
0103             error = resolve_common_transfer_errors(diagnostics, is_stagein=True)
0104             fspec.status = 'failed'
0105             fspec.status_code = error.get('rcode')
0106             raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0107 
0108         fspec.status_code = 0
0109         fspec.status = 'transferred'
0110 
0111     return files
0112 
0113 
0114 def download_file(path, surl, object_name=None):
0115     """
0116     Download a file from a GS bucket.
0117 
0118     :param path: Path to local file after download (string).
0119     :param surl: remote path (string).
0120     :param object_name: GCS object name. If not specified then file_name from path is used.
0121     :return: True if file was uploaded (else False), diagnostics (string).
0122     """
0123 
0124     # if object_name was not specified, use file name from path
0125     if object_name is None:
0126         object_name = os.path.basename(path)
0127 
0128     try:
0129         client = storage.Client()
0130         target = pathlib.Path(object_name)
0131         with target.open(mode="wb") as downloaded_file:
0132             client.download_blob_to_file(surl, downloaded_file)
0133     except Exception as error:
0134         diagnostics = 'exception caught in gs client: %s' % error
0135         logger.critical(diagnostics)
0136         return False, diagnostics
0137 
0138     return True, ""
0139 
0140 
0141 def copy_out(files, **kwargs):
0142     """
0143     Upload given files to GS storage.
0144 
0145     :param files: list of `FileSpec` objects
0146     :raise: PilotException in case of controlled error
0147     """
0148 
0149     workdir = kwargs.pop('workdir')
0150 
0151     if len(files) > 0:
0152         fspec = files[0]
0153         # bucket = re.sub(r'gs://(.*?)/.*', r'\1', fspec.turl)
0154         reobj = re.match(r'gs://([^/]*)/(.*)', fspec.turl)
0155         (bucket, remote_path) = reobj.groups()
0156 
0157     for fspec in files:
0158         logger.info('Going to process fspec.turl=%s', fspec.turl)
0159 
0160         logfiles = []
0161         lfn = fspec.lfn.strip(' ')
0162         dataset = fspec.dataset
0163         if lfn == '/' or dataset.endswith('/'):
0164             # ["pilotlog.txt", "payload.stdout", "payload.stderr"]:
0165             logfiles = os.listdir(workdir)
0166         else:
0167             logfiles = [lfn]
0168 
0169         for logfile in logfiles:
0170             path = os.path.join(workdir, logfile)
0171             if os.path.exists(path):
0172                 if logfile == config.Pilot.pilotlog or logfile == config.Payload.payloadstdout or logfile == config.Payload.payloadstderr:
0173                     content_type = "text/plain"
0174                     logger.info('Change the file=%s content-type to text/plain', logfile)
0175                 else:
0176                     content_type = None
0177                     try:
0178                         result = subprocess.check_output(["/bin/file", "-i", "-b", "-L", path])
0179                         if not isinstance(result, str):
0180                             result = result.decode('utf-8')
0181                         if result.find(';') > 0:
0182                             content_type = result.split(';')[0]
0183                             logger.info('Change the file=%s content-type to %s', logfile, content_type)
0184                     except Exception:
0185                         pass
0186 
0187                 object_name = os.path.join(remote_path, logfile)
0188                 logger.info('uploading %s to bucket=%s using object name=%s', path, bucket, object_name)
0189                 status, diagnostics = upload_file(path, bucket, object_name=object_name, content_type=content_type)
0190 
0191                 if not status:  ## an error occurred
0192                     # create new error code(s) in ErrorCodes.py and set it/them in resolve_common_transfer_errors()
0193                     error = resolve_common_transfer_errors(diagnostics, is_stagein=False)
0194                     fspec.status = 'failed'
0195                     fspec.status_code = error.get('rcode')
0196                     raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0197             else:
0198                 diagnostics = 'local output file does not exist: %s' % path
0199                 logger.warning(diagnostics)
0200                 fspec.status = 'failed'
0201                 fspec.status_code = errors.STAGEOUTFAILED
0202                 raise PilotException(diagnostics, code=fspec.status_code, state=fspec.status)
0203 
0204         fspec.status = 'transferred'
0205         fspec.status_code = 0
0206 
0207     return files
0208 
0209 
0210 def upload_file(file_name, bucket, object_name=None, content_type=None):
0211     """
0212     Upload a file to a GCS bucket.
0213 
0214     :param file_name: File to upload.
0215     :param bucket: Bucket to upload to (string).
0216     :param object_name: GCS object name. If not specified then file_name is used.
0217     :return: True if file was uploaded (else False), diagnostics (string).
0218     """
0219 
0220     # if GCS object_name was not specified, use file_name
0221     if object_name is None:
0222         object_name = file_name
0223 
0224     # upload the file
0225     try:
0226         client = storage.Client()
0227         gs_bucket = client.get_bucket(bucket)
0228         # remove any leading slash(es) in object_name
0229         object_name = object_name.lstrip('/')
0230         logger.info('uploading a file to bucket=%s in full path=%s in content_type=%s', bucket, object_name, content_type)
0231         blob = gs_bucket.blob(object_name)
0232         blob.upload_from_filename(filename=file_name, content_type=content_type)
0233         if file_name.endswith(config.Pilot.pilotlog):
0234             url_pilotlog = blob.public_url
0235             os.environ['GTAG'] = url_pilotlog
0236             logger.debug("Set envvar GTAG with the pilotLot URL=%s", url_pilotlog)
0237     except Exception as error:
0238         diagnostics = 'exception caught in gs client: %s' % error
0239         logger.critical(diagnostics)
0240         return False, diagnostics
0241 
0242     return True, ""