Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:05

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2020
0009 
0010 import os
0011 import re
0012 import glob
0013 from time import sleep
0014 
0015 from pilot.common.errorcodes import ErrorCodes
0016 from pilot.common.exception import NoSoftwareDir
0017 from pilot.info import infosys
0018 from pilot.util.container import execute
0019 from pilot.util.filehandling import read_file, write_file, copy
0020 
0021 from .metadata import get_file_info_from_xml
0022 
0023 import logging
0024 logger = logging.getLogger(__name__)
0025 
0026 errors = ErrorCodes()
0027 
0028 
0029 def get_file_system_root_path():
0030     """
0031     Return the root path of the local file system.
0032     The function returns "/cvmfs" or "/(some path)/cvmfs" in case the expected file system root path is not
0033     where it usually is (e.g. on an HPC). A site can set the base path by exporting ATLAS_SW_BASE.
0034 
0035     :return: path (string)
0036     """
0037 
0038     return os.environ.get('ATLAS_SW_BASE', '/cvmfs')
0039 
0040 
0041 def should_pilot_prepare_setup(noexecstrcnv, jobpars, imagename=None):
0042     """
0043     Determine whether the pilot should add the setup to the payload command or not.
0044     The pilot will not add asetup if jobPars already contain the information (i.e. it was set by the payload creator).
0045     If noExecStrCnv is set, then jobPars is expected to contain asetup.sh + options
0046     If a stand-alone container / user defined container is used, pilot should not prepare asetup.
0047 
0048     :param noexecstrcnv: boolean.
0049     :param jobpars: job parameters (string).
0050     :param imagename: container image (string).
0051     :return: boolean.
0052     """
0053 
0054     if imagename:
0055         return False
0056 
0057     if noexecstrcnv:
0058         if "asetup.sh" in jobpars:
0059             logger.info("asetup will be taken from jobPars")
0060             preparesetup = False
0061         else:
0062             logger.info("noExecStrCnv is set but asetup command was not found in jobPars (pilot will prepare asetup)")
0063             preparesetup = True
0064     else:
0065         logger.info("pilot will prepare the setup")
0066         preparesetup = True
0067 
0068     return preparesetup
0069 
0070 
0071 def get_alrb_export(add_if=False):
0072     """
0073     Return the export command for the ALRB path if it exists.
0074     If the path does not exist, return empty string.
0075 
0076     :param add_if: Boolean. True means that an if statement will be placed around the export.
0077     :return: export command
0078     """
0079 
0080     path = "%s/atlas.cern.ch/repo" % get_file_system_root_path()
0081     cmd = "export ATLAS_LOCAL_ROOT_BASE=%s/ATLASLocalRootBase;" % path if os.path.exists(path) else ""
0082 
0083     # if [ -z "$ATLAS_LOCAL_ROOT_BASE" ]; then export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase; fi;
0084     if cmd and add_if:
0085         cmd = 'if [ -z \"$ATLAS_LOCAL_ROOT_BASE\" ]; then ' + cmd + ' fi;'
0086 
0087     return cmd
0088 
0089 
0090 def get_asetup(asetup=True, alrb=False, add_if=False):
0091     """
0092     Define the setup for asetup, i.e. including full path to asetup and setting of ATLAS_LOCAL_ROOT_BASE
0093     Only include the actual asetup script if asetup=True. This is not needed if the jobPars contain the payload command
0094     but the pilot still needs to add the exports and the atlasLocalSetup.
0095 
0096     :param asetup: Boolean. True value means that the pilot should include the asetup command.
0097     :param alrb: Boolean. True value means that the function should return special setup used with ALRB and containers.
0098     :param add_if: Boolean. True means that an if statement will be placed around the export.
0099     :raises: NoSoftwareDir if appdir does not exist.
0100     :return: source <path>/asetup.sh (string).
0101     """
0102 
0103     cmd = ""
0104     alrb_cmd = get_alrb_export(add_if=add_if)
0105     if alrb_cmd != "":
0106         cmd = alrb_cmd
0107         if not alrb:
0108             cmd += "source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;"
0109             if asetup:
0110                 cmd += "source $AtlasSetup/scripts/asetup.sh"
0111     else:
0112         try:  # use try in case infosys has not been initiated
0113             appdir = infosys.queuedata.appdir
0114         except Exception:
0115             appdir = ""
0116         if appdir == "":
0117             appdir = os.environ.get('VO_ATLAS_SW_DIR', '')
0118         if appdir != "":
0119             # make sure that the appdir exists
0120             if not os.path.exists(appdir):
0121                 msg = 'appdir does not exist: %s' % appdir
0122                 logger.warning(msg)
0123                 raise NoSoftwareDir(msg)
0124             if asetup:
0125                 cmd = "source %s/scripts/asetup.sh" % appdir
0126 
0127     # do not return an empty string
0128     #if not cmd:
0129     #    cmd = "what?"
0130 
0131     return cmd
0132 
0133 
0134 def get_asetup_options(release, homepackage):
0135     """
0136     Determine the proper asetup options.
0137     :param release: ATLAS release string.
0138     :param homepackage: ATLAS homePackage string.
0139     :return: asetup options (string).
0140     """
0141 
0142     asetupopt = []
0143     release = re.sub('^Atlas-', '', release)
0144 
0145     # is it a user analysis homePackage?
0146     if 'AnalysisTransforms' in homepackage:
0147 
0148         _homepackage = re.sub('^AnalysisTransforms-*', '', homepackage)
0149         if _homepackage == '' or re.search(r'^\d+\.\d+\.\d+$', release) is None:  # Python 3 (added r)
0150             if release != "":
0151                 asetupopt.append(release)
0152         if _homepackage != '':
0153             asetupopt += _homepackage.split('_')
0154 
0155     else:
0156 
0157         asetupopt += homepackage.split('/')
0158         if release not in homepackage and release not in asetupopt:
0159             asetupopt.append(release)
0160 
0161     # Add the notest,here for all setups (not necessary for late releases but harmless to add)
0162     asetupopt.append('notest')
0163     # asetupopt.append('here')
0164 
0165     # Add the fast option if possible (for the moment, check for locally defined env variable)
0166     if "ATLAS_FAST_ASETUP" in os.environ:
0167         asetupopt.append('fast')
0168 
0169     return ','.join(asetupopt)
0170 
0171 
0172 def is_standard_atlas_job(release):
0173     """
0174     Is it a standard ATLAS job?
0175     A job is a standard ATLAS job if the release string begins with 'Atlas-'.
0176 
0177     :param release: Release value (string).
0178     :return: Boolean. Returns True if standard ATLAS job.
0179     """
0180 
0181     return release.startswith('Atlas-')
0182 
0183 
0184 def set_inds(dataset):
0185     """
0186     Set the INDS environmental variable used by runAthena.
0187 
0188     :param dataset: dataset for input files (realDatasetsIn) (string).
0189     :return:
0190     """
0191 
0192     inds = ""
0193     _dataset = dataset.split(',')
0194     for ds in _dataset:
0195         if "DBRelease" not in ds and ".lib." not in ds:
0196             inds = ds
0197             break
0198     if inds != "":
0199         logger.info("setting INDS environmental variable to: %s", inds)
0200         os.environ['INDS'] = inds
0201     else:
0202         logger.warning("INDS unknown")
0203 
0204 
0205 def get_analysis_trf(transform, workdir):
0206     """
0207     Prepare to download the user analysis transform with curl.
0208     The function will verify the download location from a known list of hosts.
0209 
0210     :param transform: full trf path (url) (string).
0211     :param workdir: work directory (string).
0212     :return: exit code (int), diagnostics (string), transform_name (string)
0213     """
0214 
0215     ec = 0
0216     diagnostics = ""
0217 
0218     # test if $HARVESTER_WORKDIR is set
0219     harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0220     if harvester_workdir is not None:
0221         search_pattern = "%s/jobO.*.tar.gz" % harvester_workdir
0222         logger.debug("search_pattern - %s", search_pattern)
0223         jobopt_files = glob.glob(search_pattern)
0224         for jobopt_file in jobopt_files:
0225             logger.debug("jobopt_file = %s workdir = %s", jobopt_file, workdir)
0226             try:
0227                 copy(jobopt_file, workdir)
0228             except Exception as error:
0229                 logger.error("could not copy file %s to %s : %s", jobopt_file, workdir, error)
0230 
0231     if '/' in transform:
0232         transform_name = transform.split('/')[-1]
0233     else:
0234         logger.warning('did not detect any / in %s (using full transform name)', transform)
0235         transform_name = transform
0236 
0237     # is the command already available? (e.g. if already downloaded by a preprocess/main process step)
0238     if os.path.exists(os.path.join(workdir, transform_name)):
0239         logger.info('script %s is already available - no need to download again', transform_name)
0240         return ec, diagnostics, transform_name
0241 
0242     original_base_url = ""
0243 
0244     # verify the base URL
0245     for base_url in get_valid_base_urls():
0246         if transform.startswith(base_url):
0247             original_base_url = base_url
0248             break
0249 
0250     if original_base_url == "":
0251         diagnostics = "invalid base URL: %s" % transform
0252         return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0253 
0254     # try to download from the required location, if not - switch to backup
0255     status = False
0256     for base_url in get_valid_base_urls(order=original_base_url):
0257         trf = re.sub(original_base_url, base_url, transform)
0258         logger.debug("attempting to download script: %s", trf)
0259         status, diagnostics = download_transform(trf, transform_name, workdir)
0260         if status:
0261             break
0262 
0263     if not status:
0264         return errors.TRFDOWNLOADFAILURE, diagnostics, ""
0265 
0266     logger.info("successfully downloaded script")
0267     path = os.path.join(workdir, transform_name)
0268     logger.debug("changing permission of %s to 0o755", path)
0269     try:
0270         os.chmod(path, 0o755)  # Python 2/3
0271     except Exception as error:
0272         diagnostics = "failed to chmod %s: %s" % (transform_name, error)
0273         return errors.CHMODTRF, diagnostics, ""
0274 
0275     return ec, diagnostics, transform_name
0276 
0277 
0278 def download_transform(url, transform_name, workdir):
0279     """
0280     Download the transform from the given url
0281     :param url: download URL with path to transform (string).
0282     :param transform_name: trf name (string).
0283     :param workdir: work directory (string).
0284     :return:
0285     """
0286 
0287     status = False
0288     diagnostics = ""
0289     path = os.path.join(workdir, transform_name)
0290     cmd = 'curl -sS \"%s\" > %s' % (url, path)
0291     trial = 1
0292     max_trials = 3
0293 
0294     # test if $HARVESTER_WORKDIR is set
0295     harvester_workdir = os.environ.get('HARVESTER_WORKDIR')
0296     if harvester_workdir is not None:
0297         # skip curl by setting max_trials = 0
0298         max_trials = 0
0299         source_path = os.path.join(harvester_workdir, transform_name)
0300         try:
0301             copy(source_path, path)
0302             status = True
0303         except Exception as error:
0304             status = False
0305             diagnostics = "Failed to copy file %s to %s : %s" % (source_path, path, error)
0306             logger.error(diagnostics)
0307 
0308     # try to download the trf a maximum of 3 times
0309     while trial <= max_trials:
0310         logger.info("executing command [trial %d/%d]: %s", trial, max_trials, cmd)
0311 
0312         exit_code, stdout, stderr = execute(cmd, mute=True)
0313         if not stdout:
0314             stdout = "(None)"
0315         if exit_code != 0:
0316             # Analyze exit code / output
0317             diagnostics = "curl command failed: %d, %s, %s" % (exit_code, stdout, stderr)
0318             logger.warning(diagnostics)
0319             if trial == max_trials:
0320                 logger.fatal('could not download transform: %s', stdout)
0321                 status = False
0322                 break
0323             else:
0324                 logger.info("will try again after 60 s")
0325                 sleep(60)
0326         else:
0327             logger.info("curl command returned: %s", stdout)
0328             status = True
0329             break
0330         trial += 1
0331 
0332     return status, diagnostics
0333 
0334 
0335 def get_valid_base_urls(order=None):
0336     """
0337     Return a list of valid base URLs from where the user analysis transform may be downloaded from.
0338     If order is defined, return given item first.
0339     E.g. order=http://atlpan.web.cern.ch/atlpan -> ['http://atlpan.web.cern.ch/atlpan', ...]
0340     NOTE: the URL list may be out of date.
0341 
0342     :param order: order (string).
0343     :return: valid base URLs (list).
0344     """
0345 
0346     valid_base_urls = []
0347     _valid_base_urls = ["http://www.usatlas.bnl.gov",
0348                         "https://www.usatlas.bnl.gov",
0349                         "http://pandaserver.cern.ch",
0350                         "http://atlpan.web.cern.ch/atlpan",
0351                         "https://atlpan.web.cern.ch/atlpan",
0352                         "http://classis01.roma1.infn.it",
0353                         "http://atlas-install.roma1.infn.it"]
0354 
0355     if order:
0356         valid_base_urls.append(order)
0357         for url in _valid_base_urls:
0358             if url != order:
0359                 valid_base_urls.append(url)
0360     else:
0361         valid_base_urls = _valid_base_urls
0362 
0363     return valid_base_urls
0364 
0365 
0366 def get_payload_environment_variables(cmd, job_id, task_id, attempt_nr, processing_type, site_name, analysis_job):
0367     """
0368     Return an array with enviroment variables needed by the payload.
0369 
0370     :param cmd: payload execution command (string).
0371     :param job_id: PanDA job id (string).
0372     :param task_id: PanDA task id (string).
0373     :param attempt_nr: PanDA job attempt number (int).
0374     :param processing_type: processing type (string).
0375     :param site_name: site name (string).
0376     :param analysis_job: True for user analysis jobs, False otherwise (boolean).
0377     :return: list of environment variables needed by the payload.
0378     """
0379 
0380     variables = []
0381     variables.append('export PANDA_RESOURCE=\'%s\';' % site_name)
0382     variables.append('export FRONTIER_ID=\"[%s_%s]\";' % (task_id, job_id))
0383     variables.append('export CMSSW_VERSION=$FRONTIER_ID;')
0384     variables.append('export PandaID=%s;' % os.environ.get('PANDAID', 'unknown'))
0385     variables.append('export PanDA_TaskID=\'%s\';' % os.environ.get('PanDA_TaskID', 'unknown'))
0386     variables.append('export PanDA_AttemptNr=\'%d\';' % attempt_nr)
0387     variables.append('export INDS=\'%s\';' % os.environ.get('INDS', 'unknown'))
0388 
0389     # Unset ATHENA_PROC_NUMBER if set for event service Merge jobs
0390     if "Merge_tf" in cmd and 'ATHENA_PROC_NUMBER' in os.environ:
0391         variables.append('unset ATHENA_PROC_NUMBER;')
0392         variables.append('unset ATHENA_CORE_NUMBER;')
0393 
0394     if analysis_job:
0395         variables.append('export ROOT_TTREECACHE_SIZE=1;')
0396         try:
0397             core_count = int(os.environ.get('ATHENA_PROC_NUMBER'))
0398         except Exception:
0399             _core_count = 'export ROOTCORE_NCPUS=1;'
0400         else:
0401             _core_count = 'export ROOTCORE_NCPUS=%d;' % core_count
0402         variables.append(_core_count)
0403 
0404     if processing_type == "":
0405         logger.warning("RUCIO_APPID needs job.processingType but it is not set!")
0406     else:
0407         variables.append('export RUCIO_APPID=\'%s\';' % processing_type)
0408     variables.append('export RUCIO_ACCOUNT=\'%s\';' % os.environ.get('RUCIO_ACCOUNT', 'pilot'))
0409 
0410     return variables
0411 
0412 
0413 def get_writetoinput_filenames(writetofile):
0414     """
0415     Extract the writeToFile file name(s).
0416     writeToFile='tmpin_mc16_13TeV.blah:AOD.15760866._000002.pool.root.1'
0417     -> return 'tmpin_mc16_13TeV.blah'
0418 
0419     :param writetofile: string containing file name information.
0420     :return: list of file names
0421     """
0422 
0423     filenames = []
0424     entries = writetofile.split('^')
0425     for entry in entries:
0426         if ':' in entry:
0427             name = entry.split(":")[0]
0428             name = name.replace('.pool.root.', '.txt.')  # not necessary?
0429             filenames.append(name)
0430 
0431     return filenames
0432 
0433 
0434 def replace_lfns_with_turls(cmd, workdir, filename, infiles, writetofile=""):
0435     """
0436     Replace all LFNs with full TURLs in the payload execution command.
0437 
0438     This function is used with direct access in production jobs. Athena requires a full TURL instead of LFN.
0439 
0440     :param cmd: payload execution command (string).
0441     :param workdir: location of metadata file (string).
0442     :param filename: metadata file name (string).
0443     :param infiles: list of input files.
0444     :param writetofile:
0445     :return: updated cmd (string).
0446     """
0447 
0448     turl_dictionary = {}  # { LFN: TURL, ..}
0449     path = os.path.join(workdir, filename)
0450     if os.path.exists(path):
0451         file_info_dictionary = get_file_info_from_xml(workdir, filename=filename)
0452         for inputfile in infiles:
0453             if inputfile in cmd:
0454                 turl = file_info_dictionary[inputfile][0]
0455                 turl_dictionary[inputfile] = turl
0456                 # if turl.startswith('root://') and turl not in cmd:
0457                 if turl not in cmd:
0458                     cmd = cmd.replace(inputfile, turl)
0459                     logger.info("replaced '%s' with '%s' in the run command", inputfile, turl)
0460 
0461         # replace the LFNs with TURLs in the writetofile input file list (if it exists)
0462         if writetofile and turl_dictionary:
0463             filenames = get_writetoinput_filenames(writetofile)
0464             for fname in filenames:
0465                 new_lines = []
0466                 path = os.path.join(workdir, fname)
0467                 if os.path.exists(path):
0468                     f = read_file(path)
0469                     for line in f.split('\n'):
0470                         fname = os.path.basename(line)
0471                         if fname in turl_dictionary:
0472                             turl = turl_dictionary[fname]
0473                             new_lines.append(turl)
0474                         else:
0475                             if line:
0476                                 new_lines.append(line)
0477 
0478                     lines = '\n'.join(new_lines)
0479                     if lines:
0480                         write_file(path, lines)
0481                 else:
0482                     logger.warning("file does not exist: %s", path)
0483     else:
0484         logger.warning("could not find file: %s (cannot locate TURLs for direct access)", filename)
0485 
0486     return cmd