File indexing completed on 2026-04-11 08:41:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import re
0012 import glob
0013 from time import sleep
0014
0015 from pilot.common.errorcodes import ErrorCodes
0016 from pilot.util.container import execute
0017 from pilot.util.filehandling import copy
0018
0019 import logging
0020 logger = logging.getLogger(__name__)
0021
0022 errors = ErrorCodes()
0023
0024
0025 def get_analysis_trf(transform, workdir):
0026 """
0027 Prepare to download the user analysis transform with curl.
0028 The function will verify the download location from a known list of hosts.
0029
0030 :param transform: full trf path (url) (string).
0031 :param workdir: work directory (string).
0032 :return: exit code (int), diagnostics (string), transform_name (string)
0033 """
0034
0035 ec = 0
0036 diagnostics = ""
0037
0038
0039 harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0040 if harvester_workdir is not None:
0041 search_pattern = "%s/jobO.*.tar.gz" % harvester_workdir
0042 logger.debug("search_pattern - %s" % search_pattern)
0043 jobopt_files = glob.glob(search_pattern)
0044 for jobopt_file in jobopt_files:
0045 logger.debug("jobopt_file = %s workdir = %s" % (jobopt_file, workdir))
0046 try:
0047 copy(jobopt_file, workdir)
0048 except Exception as e:
0049 logger.error("could not copy file %s to %s : %s" % (jobopt_file, workdir, e))
0050
0051 if '/' in transform:
0052 transform_name = transform.split('/')[-1]
0053 else:
0054 logger.warning('did not detect any / in %s (using full transform name)' % transform)
0055 transform_name = transform
0056
0057
0058 if os.path.exists(os.path.join(workdir, transform_name)):
0059 logger.info('script %s is already available - no need to download again' % transform_name)
0060 return ec, diagnostics, transform_name
0061
0062 original_base_url = ""
0063
0064
0065 for base_url in get_valid_base_urls():
0066 if transform.startswith(base_url):
0067 original_base_url = base_url
0068 break
0069
0070 if original_base_url == "":
0071 diagnostics = "invalid base URL: %s" % transform
0072 return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0073
0074
0075 status = False
0076 for base_url in get_valid_base_urls(order=original_base_url):
0077 trf = re.sub(original_base_url, base_url, transform)
0078 logger.debug("attempting to download script: %s" % trf)
0079 status, diagnostics = download_transform(trf, transform_name, workdir)
0080 if status:
0081 break
0082
0083 if not status:
0084 return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0085
0086 logger.info("successfully downloaded script")
0087 path = os.path.join(workdir, transform_name)
0088 logger.debug("changing permission of %s to 0o755" % path)
0089 try:
0090 os.chmod(path, 0o755)
0091 except Exception as e:
0092 diagnostics = "failed to chmod %s: %s" % (transform_name, e)
0093 return errors.CHMODTRF, diagnostics, ""
0094
0095 return ec, diagnostics, transform_name
0096
0097
0098 def get_valid_base_urls(order=None):
0099 """
0100 Return a list of valid base URLs from where the user analysis transform may be downloaded from.
0101 If order is defined, return given item first.
0102 E.g. order=http://atlpan.web.cern.ch/atlpan -> ['http://atlpan.web.cern.ch/atlpan', ...]
0103 NOTE: the URL list may be out of date.
0104
0105 :param order: order (string).
0106 :return: valid base URLs (list).
0107 """
0108
0109 valid_base_urls = []
0110 _valid_base_urls = ["https://storage.googleapis.com/drp-us-central1-containers",
0111 "http://pandaserver-doma.cern.ch:25080/trf/user"]
0112
0113 if order:
0114 valid_base_urls.append(order)
0115 for url in _valid_base_urls:
0116 if url != order:
0117 valid_base_urls.append(url)
0118 else:
0119 valid_base_urls = _valid_base_urls
0120
0121 return valid_base_urls
0122
0123
0124 def download_transform(url, transform_name, workdir):
0125 """
0126 Download the transform from the given url
0127 :param url: download URL with path to transform (string).
0128 :param transform_name: trf name (string).
0129 :param workdir: work directory (string).
0130 :return:
0131 """
0132
0133 status = False
0134 diagnostics = ""
0135 path = os.path.join(workdir, transform_name)
0136 cmd = 'curl -sS \"%s\" > %s' % (url, path)
0137 trial = 1
0138 max_trials = 3
0139
0140
0141 harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0142 if harvester_workdir is not None:
0143
0144 max_trials = 0
0145 source_path = os.path.join(harvester_workdir, transform_name)
0146 try:
0147 copy(source_path, path)
0148 status = True
0149 except Exception as error:
0150 status = False
0151 diagnostics = "Failed to copy file %s to %s : %s" % (source_path, path, error)
0152 logger.error(diagnostics)
0153
0154
0155 while trial <= max_trials:
0156 logger.info("executing command [trial %d/%d]: %s" % (trial, max_trials, cmd))
0157
0158 exit_code, stdout, stderr = execute(cmd, mute=True)
0159 if not stdout:
0160 stdout = "(None)"
0161 if exit_code != 0:
0162
0163 diagnostics = "curl command failed: %d, %s, %s" % (exit_code, stdout, stderr)
0164 logger.warning(diagnostics)
0165 if trial == max_trials:
0166 logger.fatal('could not download transform: %s' % stdout)
0167 status = False
0168 break
0169 else:
0170 logger.info("will try again after 60 s")
0171 sleep(60)
0172 else:
0173 logger.info("curl command returned: %s" % stdout)
0174 status = True
0175 break
0176 trial += 1
0177
0178 return status, diagnostics