Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /pilot2/pilot/user/generic/setup.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017
0009 
0010 import os
0011 import re
0012 import glob
0013 from time import sleep
0014 
0015 from pilot.common.errorcodes import ErrorCodes
0016 from pilot.util.container import execute
0017 from pilot.util.filehandling import copy
0018 
0019 import logging
0020 logger = logging.getLogger(__name__)
0021 
0022 errors = ErrorCodes()
0023 
0024 
0025 def get_analysis_trf(transform, workdir):
0026     """
0027     Prepare to download the user analysis transform with curl.
0028     The function will verify the download location from a known list of hosts.
0029 
0030     :param transform: full trf path (url) (string).
0031     :param workdir: work directory (string).
0032     :return: exit code (int), diagnostics (string), transform_name (string)
0033     """
0034 
0035     ec = 0
0036     diagnostics = ""
0037 
0038     # test if $HARVESTER_WORKDIR is set
0039     harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0040     if harvester_workdir is not None:
0041         search_pattern = "%s/jobO.*.tar.gz" % harvester_workdir
0042         logger.debug("search_pattern - %s" % search_pattern)
0043         jobopt_files = glob.glob(search_pattern)
0044         for jobopt_file in jobopt_files:
0045             logger.debug("jobopt_file = %s workdir = %s" % (jobopt_file, workdir))
0046             try:
0047                 copy(jobopt_file, workdir)
0048             except Exception as e:
0049                 logger.error("could not copy file %s to %s : %s" % (jobopt_file, workdir, e))
0050 
0051     if '/' in transform:
0052         transform_name = transform.split('/')[-1]
0053     else:
0054         logger.warning('did not detect any / in %s (using full transform name)' % transform)
0055         transform_name = transform
0056 
0057     # is the command already available? (e.g. if already downloaded by a preprocess/main process step)
0058     if os.path.exists(os.path.join(workdir, transform_name)):
0059         logger.info('script %s is already available - no need to download again' % transform_name)
0060         return ec, diagnostics, transform_name
0061 
0062     original_base_url = ""
0063 
0064     # verify the base URL
0065     for base_url in get_valid_base_urls():
0066         if transform.startswith(base_url):
0067             original_base_url = base_url
0068             break
0069 
0070     if original_base_url == "":
0071         diagnostics = "invalid base URL: %s" % transform
0072         return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0073 
0074     # try to download from the required location, if not - switch to backup
0075     status = False
0076     for base_url in get_valid_base_urls(order=original_base_url):
0077         trf = re.sub(original_base_url, base_url, transform)
0078         logger.debug("attempting to download script: %s" % trf)
0079         status, diagnostics = download_transform(trf, transform_name, workdir)
0080         if status:
0081             break
0082 
0083     if not status:
0084         return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0085 
0086     logger.info("successfully downloaded script")
0087     path = os.path.join(workdir, transform_name)
0088     logger.debug("changing permission of %s to 0o755" % path)
0089     try:
0090         os.chmod(path, 0o755)  # Python 2/3
0091     except Exception as e:
0092         diagnostics = "failed to chmod %s: %s" % (transform_name, e)
0093         return errors.CHMODTRF, diagnostics, ""
0094 
0095     return ec, diagnostics, transform_name
0096 
0097 
0098 def get_valid_base_urls(order=None):
0099     """
0100     Return a list of valid base URLs from where the user analysis transform may be downloaded from.
0101     If order is defined, return given item first.
0102     E.g. order=http://atlpan.web.cern.ch/atlpan -> ['http://atlpan.web.cern.ch/atlpan', ...]
0103     NOTE: the URL list may be out of date.
0104 
0105     :param order: order (string).
0106     :return: valid base URLs (list).
0107     """
0108 
0109     valid_base_urls = []
0110     _valid_base_urls = ["https://storage.googleapis.com/drp-us-central1-containers",
0111                         "http://pandaserver-doma.cern.ch:25080/trf/user"]
0112 
0113     if order:
0114         valid_base_urls.append(order)
0115         for url in _valid_base_urls:
0116             if url != order:
0117                 valid_base_urls.append(url)
0118     else:
0119         valid_base_urls = _valid_base_urls
0120 
0121     return valid_base_urls
0122 
0123 
0124 def download_transform(url, transform_name, workdir):
0125     """
0126     Download the transform from the given url
0127     :param url: download URL with path to transform (string).
0128     :param transform_name: trf name (string).
0129     :param workdir: work directory (string).
0130     :return:
0131     """
0132 
0133     status = False
0134     diagnostics = ""
0135     path = os.path.join(workdir, transform_name)
0136     cmd = 'curl -sS \"%s\" > %s' % (url, path)
0137     trial = 1
0138     max_trials = 3
0139 
0140     # test if $HARVESTER_WORKDIR is set
0141     harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0142     if harvester_workdir is not None:
0143         # skip curl by setting max_trials = 0
0144         max_trials = 0
0145         source_path = os.path.join(harvester_workdir, transform_name)
0146         try:
0147             copy(source_path, path)
0148             status = True
0149         except Exception as error:
0150             status = False
0151             diagnostics = "Failed to copy file %s to %s : %s" % (source_path, path, error)
0152             logger.error(diagnostics)
0153 
0154     # try to download the trf a maximum of 3 times
0155     while trial <= max_trials:
0156         logger.info("executing command [trial %d/%d]: %s" % (trial, max_trials, cmd))
0157 
0158         exit_code, stdout, stderr = execute(cmd, mute=True)
0159         if not stdout:
0160             stdout = "(None)"
0161         if exit_code != 0:
0162             # Analyze exit code / output
0163             diagnostics = "curl command failed: %d, %s, %s" % (exit_code, stdout, stderr)
0164             logger.warning(diagnostics)
0165             if trial == max_trials:
0166                 logger.fatal('could not download transform: %s' % stdout)
0167                 status = False
0168                 break
0169             else:
0170                 logger.info("will try again after 60 s")
0171                 sleep(60)
0172         else:
0173             logger.info("curl command returned: %s" % stdout)
0174             status = True
0175             break
0176         trial += 1
0177 
0178     return status, diagnostics