Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:05

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2020
0009 # - Alexander Bogdanchikov, Alexander.Bogdanchikov@cern.ch, 2019-2020
0010 
0011 import os
0012 import pipes
0013 import re
0014 import logging
0015 import traceback
0016 
0017 # for user container test: import urllib
0018 
0019 from pilot.common.errorcodes import ErrorCodes
0020 from pilot.common.exception import PilotException, FileHandlingFailure
0021 from pilot.user.atlas.setup import get_asetup, get_file_system_root_path
0022 from pilot.user.atlas.proxy import verify_proxy
0023 from pilot.info import InfoService, infosys
0024 from pilot.util.auxiliary import is_python3
0025 from pilot.util.config import config
0026 from pilot.util.filehandling import write_file
0027 from pilot.util import https
0028 
0029 logger = logging.getLogger(__name__)
0030 errors = ErrorCodes()
0031 
0032 
0033 def get_payload_proxy(proxy_outfile_name, voms_role='atlas'):
0034     """
0035     :param proxy_outfile_name: specify the file to store proxy
0036     :param voms_role: what proxy (role) to request. It should exist on Panda node
0037     :return: True on success
0038     """
0039     try:
0040         # it assumes that https_setup() was done already
0041         url = os.environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0042         res = https.request('{pandaserver}/server/panda/getProxy'.format(pandaserver=url), data={'role': voms_role})
0043 
0044         if res is None:
0045             logger.error("Unable to get proxy with role '%s' from panda server", voms_role)
0046             return False
0047 
0048         if res['StatusCode'] != 0:
0049             logger.error("When get proxy with role '%s' panda server returned: %s", voms_role, res['errorDialog'])
0050             return False
0051 
0052         proxy_contents = res['userProxy']
0053 
0054     except Exception as exc:
0055         logger.error("Get proxy from panda server failed: %s, %s", exc, traceback.format_exc())
0056         return False
0057 
0058     res = False
0059     try:
0060         # pre-create empty proxy file with secure permissions. Prepare it for write_file() which can not
0061         # set file permission mode, it will writes to the existing file with correct permissions.
0062         _file = os.open(proxy_outfile_name, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
0063         os.close(_file)
0064         res = write_file(proxy_outfile_name, proxy_contents, mute=False)  # returns True on success
0065     except (IOError, OSError, FileHandlingFailure) as exc:
0066         logger.error("Exception when try to save proxy to the file '%s': %s, %s",
0067                      proxy_outfile_name, exc, traceback.format_exc())
0068 
0069     return res
0070 
0071 
0072 def do_use_container(**kwargs):
0073     """
0074     Decide whether to use a container or not.
0075 
0076     :param kwargs: dictionary of key-word arguments.
0077     :return: True if function has decided that a container should be used, False otherwise (boolean).
0078     """
0079 
0080     # to force no container use: return False
0081     use_container = False
0082 
0083     job = kwargs.get('job', False)
0084     copytool = kwargs.get('copytool', False)
0085     if job:
0086         # for user jobs, TRF option --containerImage must have been used, ie imagename must be set
0087         if job.imagename and job.imagename != 'NULL':
0088             use_container = True
0089             logger.debug('job.imagename set -> use_container = True')
0090         elif not (job.platform or job.alrbuserplatform):
0091             use_container = False
0092             logger.debug('not (job.platform or job.alrbuserplatform) -> use_container = False')
0093         else:
0094             queuedata = job.infosys.queuedata
0095             container_name = queuedata.container_type.get("pilot")
0096             if container_name:
0097                 use_container = True
0098                 logger.debug('container_name == \'%s\' -> use_container = True', container_name)
0099             else:
0100                 logger.debug('else -> use_container = False')
0101     elif copytool:
0102         # override for copytools - use a container for stage-in/out
0103         use_container = True
0104         logger.debug('copytool -> use_container = False')
0105     else:
0106         logger.debug('not job -> use_container = False')
0107 
0108     return use_container
0109 
0110 
0111 def wrapper(executable, **kwargs):
0112     """
0113     Wrapper function for any container specific usage.
0114     This function will be called by pilot.util.container.execute() and prepends the executable with a container command.
0115 
0116     :param executable: command to be executed (string).
0117     :param kwargs: dictionary of key-word arguments.
0118     :return: executable wrapped with container command (string).
0119     """
0120 
0121     workdir = kwargs.get('workdir', '.')
0122     pilot_home = os.environ.get('PILOT_HOME', '')
0123     job = kwargs.get('job', None)
0124 
0125     logger.info('container wrapper called')
0126 
0127     if workdir == '.' and pilot_home != '':
0128         workdir = pilot_home
0129 
0130     # if job.imagename (from --containerimage <image>) is set, then always use raw singularity
0131     if config.Container.setup_type == "ALRB":  # and job and not job.imagename:
0132         fctn = alrb_wrapper
0133     else:
0134         fctn = singularity_wrapper
0135     return fctn(executable, workdir, job=job)
0136 
0137 
0138 def extract_platform_and_os(platform):
0139     """
0140     Extract the platform and OS substring from platform
0141 
0142     :param platform (string): E.g. "x86_64-slc6-gcc48-opt"
0143     :return: extracted platform specifics (string). E.g. "x86_64-slc6". In case of failure, return the full platform
0144     """
0145 
0146     pattern = r"([A-Za-z0-9_-]+)-.+-.+"
0147     found = re.findall(re.compile(pattern), platform)
0148 
0149     if found:
0150         ret = found[0]
0151     else:
0152         logger.warning("could not extract architecture and OS substring using pattern=%s from platform=%s"
0153                        "(will use %s for image name)", pattern, platform, platform)
0154         ret = platform
0155 
0156     return ret
0157 
0158 
0159 def get_grid_image_for_singularity(platform):
0160     """
0161     Return the full path to the singularity grid image
0162 
0163     :param platform: E.g. "x86_64-slc6" (string).
0164     :return: full path to grid image (string).
0165     """
0166 
0167     if not platform or platform == "":
0168         platform = "x86_64-slc6"
0169         logger.warning("using default platform=%s (cmtconfig not set)", platform)
0170 
0171     arch_and_os = extract_platform_and_os(platform)
0172     image = arch_and_os + ".img"
0173     _path = os.path.join(get_file_system_root_path(), "atlas.cern.ch/repo/containers/images/singularity")
0174     path = os.path.join(_path, image)
0175     if not os.path.exists(path):
0176         image = 'x86_64-centos7.img'
0177         logger.warning('path does not exist: %s (trying with image %s instead)', path, image)
0178         path = os.path.join(_path, image)
0179         if not os.path.exists(path):
0180             logger.warning('path does not exist either: %s', path)
0181             path = ""
0182 
0183     return path
0184 
0185 
0186 def get_middleware_type():
0187     """
0188     Return the middleware type from the container type.
0189     E.g. container_type = 'singularity:pilot;docker:wrapper;container:middleware'
0190     get_middleware_type() -> 'container', meaning that middleware should be taken from the container. The default
0191     is otherwise 'workernode', i.e. middleware is assumed to be present on the worker node.
0192 
0193     :return: middleware_type (string)
0194     """
0195 
0196     middleware_type = ""
0197     container_type = infosys.queuedata.container_type
0198 
0199     middleware = 'middleware'
0200     if container_type and container_type != "" and middleware in container_type:
0201         try:
0202             container_names = container_type.split(';')
0203             for name in container_names:
0204                 _split = name.split(':')
0205                 if middleware == _split[0]:
0206                     middleware_type = _split[1]
0207         except IndexError as exc:
0208             logger.warning("failed to parse the container name: %s, %s", container_type, exc)
0209     else:
0210         # logger.warning("container middleware type not specified in queuedata")
0211         # no middleware type was specified, assume that middleware is present on worker node
0212         middleware_type = "workernode"
0213 
0214     return middleware_type
0215 
0216 
0217 def extract_atlas_setup(asetup, swrelease):
0218     """
0219     Extract the asetup command from the full setup command for jobs that have a defined release.
0220     export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;
0221       source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;source $AtlasSetup/scripts/asetup.sh
0222     -> $AtlasSetup/scripts/asetup.sh, export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase; source
0223          ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
0224 
0225     :param asetup: full asetup command (string).
0226     :param swrelease: ATLAS release (string).
0227     :return: extracted asetup command, cleaned up full asetup command without asetup.sh (string).
0228     """
0229 
0230     if not swrelease:
0231         return '', ''
0232 
0233     try:
0234         # source $AtlasSetup/scripts/asetup.sh
0235         atlas_setup = asetup.split(';')[-1]
0236         # export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;
0237         #   source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
0238         cleaned_atlas_setup = asetup.replace(atlas_setup, '')
0239         atlas_setup = atlas_setup.replace('source ', '')
0240     except AttributeError as exc:
0241         logger.debug('exception caught while extracting asetup command: %s', exc)
0242         atlas_setup = ''
0243         cleaned_atlas_setup = ''
0244 
0245     return atlas_setup, cleaned_atlas_setup
0246 
0247 
0248 def extract_full_atlas_setup(cmd, atlas_setup):
0249     """
0250     Extract the full asetup (including options) from the payload setup command.
0251     atlas_setup is typically '$AtlasSetup/scripts/asetup.sh'.
0252 
0253     :param cmd: full payload setup command (string).
0254     :param atlas_setup: asetup command (string).
0255     :return: extracted full asetup command, updated full payload setup command without asetup part (string).
0256     """
0257 
0258     updated_cmds = []
0259     extracted_asetup = ""
0260 
0261     if not atlas_setup:
0262         return extracted_asetup, cmd
0263 
0264     try:
0265         _cmd = cmd.split(';')
0266         for subcmd in _cmd:
0267             if atlas_setup in subcmd:
0268                 extracted_asetup = subcmd
0269             else:
0270                 updated_cmds.append(subcmd)
0271         updated_cmd = ';'.join(updated_cmds)
0272     except AttributeError as exc:
0273         logger.warning('exception caught while extracting full atlas setup: %s', exc)
0274         updated_cmd = cmd
0275     logger.debug('updated payload setup command: %s', updated_cmd)
0276 
0277     return extracted_asetup, updated_cmd
0278 
0279 
0280 def update_alrb_setup(cmd, use_release_setup):
0281     """
0282     Update the ALRB setup command.
0283     Add the ALRB_CONT_SETUPFILE in case the release setup file was created earlier (required available cvmfs).
0284 
0285     :param cmd: full ALRB setup command (string).
0286     :param use_release_setup: should the release setup file be added to the setup command? (Boolean).
0287     :return: updated ALRB setup command (string).
0288     """
0289 
0290     updated_cmds = []
0291     try:
0292         _cmd = cmd.split(';')
0293         for subcmd in _cmd:
0294             if subcmd.startswith('source ${ATLAS_LOCAL_ROOT_BASE}') and use_release_setup:
0295                 updated_cmds.append('export ALRB_CONT_SETUPFILE="/srv/%s"' % config.Container.release_setup)
0296             updated_cmds.append(subcmd)
0297         updated_cmd = ';'.join(updated_cmds)
0298     except AttributeError as exc:
0299         logger.warning('exception caught while extracting full atlas setup: %s', exc)
0300         updated_cmd = cmd
0301     logger.debug('updated ALRB command: %s', updated_cmd)
0302 
0303     return updated_cmd
0304 
0305 
0306 def update_for_user_proxy(_cmd, cmd, is_analysis=False):
0307     """
0308     Add the X509 user proxy to the container sub command string if set, and remove it from the main container command.
0309     Try to receive payload proxy and update X509_USER_PROXY in container setup command
0310     In case payload proxy from server is required, this function will also download and verify this proxy.
0311 
0312     :param _cmd: container setup command (string).
0313     :param cmd: command the container will execute (string).
0314     :param is_analysis: True for user job (Boolean).
0315     :return: exit_code (int), diagnostics (string), updated _cmd (string), updated cmd (string).
0316     """
0317 
0318     exit_code = 0
0319     diagnostics = ""
0320 
0321     x509 = os.environ.get('X509_USER_PROXY', '')
0322     if x509 != "":
0323         # do not include the X509_USER_PROXY in the command the container will execute
0324         cmd = cmd.replace("export X509_USER_PROXY=%s;" % x509, '')
0325         # add it instead to the container setup command:
0326 
0327         # download and verify payload proxy from the server if desired
0328         proxy_verification = os.environ.get('PILOT_PROXY_VERIFICATION') == 'True' and os.environ.get('PILOT_PAYLOAD_PROXY_VERIFICATION') == 'True'
0329         if proxy_verification and config.Pilot.payload_proxy_from_server and is_analysis:
0330             exit_code, diagnostics, x509 = get_and_verify_payload_proxy_from_server(x509)
0331             if exit_code != 0:  # do not return non-zero exit code if only download fails
0332                 logger.warning('payload proxy verification failed')
0333 
0334         # add X509_USER_PROXY setting to the container setup command
0335         _cmd = "export X509_USER_PROXY=%s;" % x509 + _cmd
0336 
0337     return exit_code, diagnostics, _cmd, cmd
0338 
0339 
0340 def get_and_verify_payload_proxy_from_server(x509):
0341     """
0342     Download a payload proxy from the server and verify it.
0343 
0344     :param x509: X509_USER_PROXY (string).
0345     :return:  exit code (int), diagnostics (string), updated X509_USER_PROXY (string).
0346     """
0347 
0348     exit_code = 0
0349     diagnostics = ""
0350 
0351     # try to receive payload proxy and update x509
0352     x509_payload = re.sub('.proxy$', '', x509) + '-payload.proxy'  # compose new name to store payload proxy
0353     #x509_payload = re.sub('.proxy$', '', x509) + 'p.proxy'  # compose new name to store payload proxy
0354 
0355     logger.info("download payload proxy from server")
0356     if get_payload_proxy(x509_payload):
0357         logger.info("server returned payload proxy (verifying)")
0358         exit_code, diagnostics = verify_proxy(x509=x509_payload, proxy_id=None)
0359         # if all verifications fail, verify_proxy()  returns exit_code=0 and last failure in diagnostics
0360         if exit_code != 0 or (exit_code == 0 and diagnostics != ''):
0361             logger.warning(diagnostics)
0362             logger.info("payload proxy verification failed")
0363         else:
0364             logger.info("payload proxy verified")
0365             # is commented: no user proxy should be in the command the container will execute
0366             # cmd = cmd.replace("export X509_USER_PROXY=%s;" % x509, "export X509_USER_PROXY=%s;" % x509_payload)
0367             x509 = x509_payload
0368     else:
0369         logger.warning("get_payload_proxy() failed")
0370 
0371     return exit_code, diagnostics, x509
0372 
0373 
0374 def set_platform(job, alrb_setup):
0375     """
0376     Set thePlatform variable and add it to the sub container command.
0377 
0378     :param job: job object.
0379     :param alrb_setup: ALRB setup (string).
0380     :return: updated ALRB setup (string).
0381     """
0382 
0383     if job.alrbuserplatform:
0384         alrb_setup += 'export thePlatform=\"%s\";' % job.alrbuserplatform
0385     elif job.preprocess and job.containeroptions:
0386         alrb_setup += 'export thePlatform=\"%s\";' % job.containeroptions.get('containerImage')
0387     elif job.imagename:
0388         alrb_setup += 'export thePlatform=\"%s\";' % job.imagename
0389     elif job.platform:
0390         alrb_setup += 'export thePlatform=\"%s\";' % job.platform
0391 
0392     return alrb_setup
0393 
0394 
0395 def get_container_options(container_options):
0396     """
0397     Get the container options from AGIS for the container execution command.
0398     For Raythena ES jobs, replace the -C with "" (otherwise IPC does not work, needed by yampl).
0399 
0400     :param container_options: container options from AGIS (string).
0401     :return: updated container command (string).
0402     """
0403 
0404     is_raythena = os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena'
0405 
0406     opts = ''
0407     # Set the singularity options
0408     if container_options:
0409         # the event service payload cannot use -C/--containall since it will prevent yampl from working
0410         if is_raythena:
0411             if '-C' in container_options:
0412                 container_options = container_options.replace('-C', '')
0413             if '--containall' in container_options:
0414                 container_options = container_options.replace('--containall', '')
0415         if container_options:
0416             opts += '-e \"%s\"' % container_options
0417     else:
0418         # consider using options "-c -i -p" instead of "-C". The difference is that the latter blocks all environment
0419         # variables by default and the former does not
0420         # update: skip the -i to allow IPC, otherwise yampl won't work
0421         if is_raythena:
0422             pass
0423             # opts += 'export ALRB_CONT_CMDOPTS=\"$ALRB_CONT_CMDOPTS -c -i -p\";'
0424         else:
0425             opts += '-e \"-C\"'
0426 
0427     return opts
0428 
0429 
0430 def alrb_wrapper(cmd, workdir, job=None):
0431     """
0432     Wrap the given command with the special ALRB setup for containers
0433     E.g. cmd = /bin/bash hello_world.sh
0434     ->
0435     export thePlatform="x86_64-slc6-gcc48-opt"
0436     export ALRB_CONT_RUNPAYLOAD="cmd'
0437     setupATLAS -c $thePlatform
0438 
0439     :param cmd (string): command to be executed in a container.
0440     :param workdir: (not used)
0441     :param job: job object.
0442     :return: prepended command with singularity execution command (string).
0443     """
0444 
0445     if not job:
0446         logger.warning('the ALRB wrapper did not get a job object - cannot proceed')
0447         return cmd
0448 
0449     queuedata = job.infosys.queuedata
0450     container_name = queuedata.container_type.get("pilot")  # resolve container name for user=pilot
0451     if container_name:
0452         # first get the full setup, which should be removed from cmd (or ALRB setup won't work)
0453         _asetup = get_asetup()
0454         # get_asetup()
0455         # -> export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh
0456         #     --quiet;source $AtlasSetup/scripts/asetup.sh
0457         # atlas_setup = $AtlasSetup/scripts/asetup.sh
0458         # clean_asetup = export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source
0459         #                   ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
0460         atlas_setup, clean_asetup = extract_atlas_setup(_asetup, job.swrelease)
0461         full_atlas_setup = get_full_asetup(cmd, 'source ' + atlas_setup) if atlas_setup and clean_asetup else ''
0462 
0463         # do not include 'clean_asetup' in the container script
0464         if clean_asetup and full_atlas_setup:
0465             cmd = cmd.replace(clean_asetup, '')
0466             # for stand-alone containers, do not include the full atlas setup either
0467             if job.imagename:
0468                 cmd = cmd.replace(full_atlas_setup, '')
0469 
0470         # get_asetup(asetup=False)
0471         # -> export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
0472 
0473         # get simplified ALRB setup (export)
0474         alrb_setup = get_asetup(alrb=True, add_if=True)
0475         # get_asetup(alrb=True)
0476         # -> export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;
0477         # get_asetup(alrb=True, add_if=True)
0478         # -> if [ -z "$ATLAS_LOCAL_ROOT_BASE" ]; then export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase; fi;
0479 
0480         # add user proxy if necessary (actually it should also be removed from cmd)
0481         exit_code, diagnostics, alrb_setup, cmd = update_for_user_proxy(alrb_setup, cmd, is_analysis=job.is_analysis())
0482         if exit_code:
0483             job.piloterrordiag = diagnostics
0484             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
0485         # set the platform info
0486         alrb_setup = set_platform(job, alrb_setup)
0487 
0488         # add the jobid to be used as an identifier for the payload running inside the container
0489         # it is used to identify the pid for the process to be tracked by the memory monitor
0490         if 'export PandaID' not in alrb_setup:
0491             alrb_setup += "export PandaID=%s;" % job.jobid
0492 
0493         # add TMPDIR
0494         cmd = "export TMPDIR=/srv;export GFORTRAN_TMPDIR=/srv;" + cmd
0495         cmd = cmd.replace(';;', ';')
0496 
0497         # get the proper release setup script name, and create the script if necessary
0498         release_setup, cmd = create_release_setup(cmd, atlas_setup, full_atlas_setup, job.swrelease, job.imagename,
0499                                                   job.workdir, queuedata.is_cvmfs)
0500         if not cmd:
0501             diagnostics = 'payload setup was reset due to missing release setup in unpacked container'
0502             logger.warning(diagnostics)
0503             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGRELEASEUNPACKED)
0504             return ""
0505 
0506         # correct full payload command in case preprocess command are used (ie replace trf with setupATLAS -c ..)
0507         if job.preprocess and job.containeroptions:
0508             cmd = replace_last_command(cmd, job.containeroptions.get('containerExec'))
0509             logger.debug('updated cmd with containerExec: %s', cmd)
0510 
0511         # write the full payload command to a script file
0512         container_script = config.Container.container_script
0513         logger.debug('command to be written to container script file:\n\n%s:\n\n%s\n', container_script, cmd)
0514         try:
0515             write_file(os.path.join(job.workdir, container_script), cmd, mute=False)
0516             os.chmod(os.path.join(job.workdir, container_script), 0o755)  # Python 2/3
0517         # except (FileHandlingFailure, FileNotFoundError) as exc:  # Python 3
0518         except (FileHandlingFailure, OSError) as exc:  # Python 2/3
0519             logger.warning('exception caught: %s', exc)
0520             return ""
0521 
0522         # also store the command string in the job object
0523         job.command = cmd
0524 
0525         # add atlasLocalSetup command + options (overwrite the old cmd since the new cmd is the containerised version)
0526         cmd = add_asetup(job, alrb_setup, queuedata.is_cvmfs, release_setup, container_script, queuedata.container_options)
0527 
0528         # add any container options if set
0529         execargs = job.containeroptions.get('execArgs', None)
0530         if execargs:
0531             cmd += ' ' + execargs
0532         logger.debug('\n\nfinal command:\n\n%s\n', cmd)
0533     else:
0534         logger.warning('container name not defined in CRIC')
0535 
0536     return cmd
0537 
0538 
0539 def is_release_setup(script, imagename):
0540     """
0541     Does the release_setup.sh file exist?
0542     This check can only be made for unpacked containers. These must have the release setup file present, or setup will
0543     fail. For non-unpacked containers, the function will return True and the pilot will assume that the container has
0544     the setup file.
0545 
0546     :param script: release setup script (string).
0547     :param imagename: container/image name (string).
0548     :return: Boolean.
0549     """
0550 
0551     if 'unpacked' in imagename:
0552         if script.startswith('/'):
0553             script = script[1:]
0554         exists = True if os.path.exists(os.path.join(imagename, script)) else False
0555         if exists:
0556             logger.info('%s is present in %s', script, imagename)
0557         else:
0558             logger.warning('%s is not present in %s - setup has failed', script, imagename)
0559     else:
0560         exists = True
0561         logger.info('%s is assumed to be present in %s', script, imagename)
0562     return exists
0563 
0564 
0565 def add_asetup(job, alrb_setup, is_cvmfs, release_setup, container_script, container_options):
0566     """
0567     Add atlasLocalSetup and options to form the final payload command.
0568 
0569     :param job: job object.
0570     :param alrb_setup: ALRB setup (string).
0571     :param is_cvmfs: True for cvmfs sites (Boolean).
0572     :param release_setup: release setup (string).
0573     :param container_script: container script name (string).
0574     :param container_options: container options (string).
0575     :return: final payload command (string).
0576     """
0577 
0578     # this should not be necessary after the extract_container_image() in JobData update
0579     # containerImage should have been removed already
0580     if '--containerImage' in job.jobparams:
0581         job.jobparams, container_path = remove_container_string(job.jobparams)
0582         if job.alrbuserplatform:
0583             if not is_cvmfs:
0584                 alrb_setup += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s' % job.alrbuserplatform
0585         elif container_path != "":
0586             alrb_setup += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s' % container_path
0587         else:
0588             logger.warning('failed to extract container path from %s', job.jobparams)
0589             alrb_setup = ""
0590         if alrb_setup and not is_cvmfs:
0591             alrb_setup += ' -d'
0592     else:
0593         alrb_setup += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh '
0594         if job.platform or job.alrbuserplatform or job.imagename:
0595             alrb_setup += '-c $thePlatform'
0596             if not is_cvmfs:
0597                 alrb_setup += ' -d'
0598 
0599     # update the ALRB setup command
0600     alrb_setup += ' -s %s' % release_setup
0601     alrb_setup += ' -r /srv/' + container_script
0602     alrb_setup = alrb_setup.replace('  ', ' ').replace(';;', ';')
0603 
0604     # add container options
0605     alrb_setup += ' ' + get_container_options(container_options)
0606     alrb_setup = alrb_setup.replace('  ', ' ')
0607     cmd = alrb_setup
0608 
0609     # correct full payload command in case preprocess command are used (ie replace trf with setupATLAS -c ..)
0610     #if job.preprocess and job.containeroptions:
0611     #    logger.debug('will update cmd=%s', cmd)
0612     #    cmd = replace_last_command(cmd, 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c $thePlatform')
0613     #    logger.debug('updated cmd with containerImage')
0614 
0615     return cmd
0616 
0617 
0618 def get_full_asetup(cmd, atlas_setup):
0619     """
0620     Extract the full asetup command from the payload execution command.
0621     (Easier that generating it again). We need to remove this command for stand-alone containers.
0622     Alternatively: do not include it in the first place (but this seems to trigger the need for further changes).
0623     atlas_setup is "source $AtlasSetup/scripts/asetup.sh", which is extracted in a previous step.
0624     The function typically returns: "source $AtlasSetup/scripts/asetup.sh 21.0,Athena,2020-05-19T2148,notest --makeflags='$MAKEFLAGS';".
0625 
0626     :param cmd: payload execution command (string).
0627     :param atlas_setup: extracted atlas setup (string).
0628     :return: full atlas setup (string).
0629     """
0630 
0631     pos = cmd.find(atlas_setup)
0632     cmd = cmd[pos:]  # remove everything before 'source $AtlasSetup/..'
0633     pos = cmd.find(';')
0634     cmd = cmd[:pos + 1]  # remove everything after the first ;, but include the trailing ;
0635 
0636     return cmd
0637 
0638 
0639 def replace_last_command(cmd, replacement):
0640     """
0641     Replace the last command in cmd with given replacement.
0642 
0643     :param cmd: command (string).
0644     :param replacement: replacement (string).
0645     :return: updated command (string).
0646     """
0647 
0648     cmd = cmd.strip('; ')
0649     last_bit = cmd.split(';')[-1]
0650     cmd = cmd.replace(last_bit.strip(), replacement)
0651 
0652     return cmd
0653 
0654 
0655 def create_release_setup(cmd, atlas_setup, full_atlas_setup, release, imagename, workdir, is_cvmfs):
0656     """
0657     Get the proper release setup script name, and create the script if necessary.
0658 
0659     This function also updates the cmd string (removes full asetup from payload command).
0660 
0661     Note: for stand-alone containers, the function will return /release_setup.sh and assume that this script exists
0662     in the container. The pilot will only create a my_release_setup.sh script for OS containers.
0663 
0664     In case the release setup is not present in an unpacked container, the function will reset the cmd string.
0665 
0666     :param cmd: Payload execution command (string).
0667     :param atlas_setup: asetup command (string).
0668     :param full_atlas_setup: full asetup command (string).
0669     :param release: software release, needed to determine Athena environment (string).
0670     :param imagename: container image name (string).
0671     :param workdir: job workdir (string).
0672     :param is_cvmfs: does the queue have cvmfs? (Boolean).
0673     :return: proper release setup name (string), updated cmd (string).
0674     """
0675 
0676     release_setup_name = '/srv/my_release_setup.sh'
0677 
0678     # extracted_asetup should be written to 'my_release_setup.sh' and cmd to 'container_script.sh'
0679     content = 'echo \"INFO: sourcing %s inside the container. ' \
0680               'This should not run if it is a ATLAS standalone container\"' % release_setup_name
0681     if is_cvmfs and release and release != 'NULL':
0682         content, cmd = extract_full_atlas_setup(cmd, atlas_setup)
0683         if not content:
0684             content = full_atlas_setup
0685 
0686     content += '\nreturn $?'
0687     logger.debug('command to be written to release setup file:\n\n%s:\n\n%s\n', release_setup_name, content)
0688     try:
0689         write_file(os.path.join(workdir, os.path.basename(release_setup_name)), content, mute=False)
0690     except FileHandlingFailure as exc:
0691         logger.warning('exception caught: %s', exc)
0692 
0693     # reset cmd in case release_setup.sh does not exist in unpacked image (only for those containers)
0694     if imagename and release and release != 'NULL':
0695         cmd = cmd.replace(';;', ';') if is_release_setup(release_setup_name, imagename) else ''
0696 
0697     return release_setup_name, cmd
0698 
0699 
0700 def create_release_setup_old(cmd, atlas_setup, full_atlas_setup, release, imagename, workdir, is_cvmfs):
0701     """
0702     Get the proper release setup script name, and create the script if necessary.
0703 
0704     This function also updates the cmd string (removes full asetup from payload command).
0705 
0706     Note: for stand-alone containers, the function will return /release_setup.sh and assume that this script exists
0707     in the container. The pilot will only create a my_release_setup.sh script for OS containers.
0708 
0709     In case the release setup is not present in an unpacked container, the function will reset the cmd string.
0710 
0711     :param cmd: Payload execution command (string).
0712     :param atlas_setup: asetup command (string).
0713     :param full_atlas_setup: full asetup command (string).
0714     :param release: software release, needed to determine Athena environment (string).
0715     :param imagename: container image name (string).
0716     :param workdir: job workdir (string).
0717     :param is_cvmfs: does the queue have cvmfs? (Boolean).
0718     :return: proper release setup name (string), updated cmd (string).
0719     """
0720 
0721     release_setup_name = get_release_setup_name(release, imagename)
0722 
0723     # note: if release_setup_name.startswith('/'), the pilot will NOT create the script
0724     if not release_setup_name.startswith('/'):
0725         # extracted_asetup should be written to 'my_release_setup.sh' and cmd to 'container_script.sh'
0726         content = 'echo \"INFO: sourcing %s inside the container. ' \
0727                   'This should not run if it is a ATLAS standalone container\"' % release_setup_name
0728         if is_cvmfs:
0729             content, cmd = extract_full_atlas_setup(cmd, atlas_setup)
0730             if not content:
0731                 content = full_atlas_setup
0732         if not content:
0733             logger.debug(
0734                 'will create an empty (almost) release setup file since asetup could not be extracted from command')
0735         logger.debug('command to be written to release setup file:\n\n%s:\n\n%s\n', release_setup_name, content)
0736         try:
0737             write_file(os.path.join(workdir, release_setup_name), content, mute=False)
0738         except FileHandlingFailure as exc:
0739             logger.warning('exception caught: %s', exc)
0740     else:
0741         # reset cmd in case release_setup.sh does not exist in unpacked image (only for those containers)
0742         cmd = cmd.replace(';;', ';') if is_release_setup(release_setup_name, imagename) else ''
0743 
0744     # add the /srv for OS containers
0745     if not release_setup_name.startswith('/'):
0746         release_setup_name = os.path.join('/srv', release_setup_name)
0747 
0748     return release_setup_name, cmd
0749 
0750 
0751 def get_release_setup_name(release, imagename):
0752     """
0753     Return the file name for the release setup script.
0754 
0755     NOTE: the /srv path will only be added later, in the case of OS containers.
0756 
0757     For OS containers, return config.Container.release_setup (my_release_setup.sh);
0758     for stand-alone containers (user defined containers, ie when --containerImage or job.imagename was used/set),
0759     return '/release_setup.sh'. release_setup.sh will NOT be created for stand-alone containers.
0760     The pilot will specify /release_setup.sh only when jobs use the Athena environment (ie has a set job.swrelease).
0761 
0762     :param release: software release (string).
0763     :param imagename: container image name (string).
0764     :return: release setup file name (string).
0765     """
0766 
0767     if imagename and release and release != 'NULL':
0768         # stand-alone containers (script is assumed to exist inside image/container so will ignore this /srv/my_release_setup.sh)
0769         release_setup_name = '/srv/my_release_setup.sh'
0770         # stand-alone containers (script is assumed to exist inside image/container)
0771         # release_setup_name = '/release_setup.sh'
0772     else:
0773         # OS containers (script will be created by pilot)
0774         release_setup_name = config.Container.release_setup
0775         if not release_setup_name:
0776             release_setup_name = 'my_release_setup.sh'
0777 
0778     # note: if release_setup_name.startswith('/'), the pilot will NOT create the script
0779 
0780     return release_setup_name
0781 
0782 
0783 ## DEPRECATED, remove after verification with user container job
0784 def remove_container_string(job_params):
0785     """ Retrieve the container string from the job parameters """
0786 
0787     pattern = r" \'?\-\-containerImage\=?\ ?([\S]+)\ ?\'?"
0788     compiled_pattern = re.compile(pattern)
0789 
0790     # remove any present ' around the option as well
0791     job_params = re.sub(r'\'\ \'', ' ', job_params)
0792 
0793     # extract the container path
0794     found = re.findall(compiled_pattern, job_params)
0795     container_path = found[0] if found else ""
0796 
0797     # Remove the pattern and update the job parameters
0798     job_params = re.sub(pattern, ' ', job_params)
0799 
0800     return job_params, container_path
0801 
0802 
0803 def singularity_wrapper(cmd, workdir, job=None):
0804     """
0805     Prepend the given command with the singularity execution command
0806     E.g. cmd = /bin/bash hello_world.sh
0807     -> singularity_command = singularity exec -B <bindmountsfromcatchall> <img> /bin/bash hello_world.sh
0808     singularity exec -B <bindmountsfromcatchall>  /cvmfs/atlas.cern.ch/repo/images/singularity/x86_64-slc6.img <script>
0809     Note: if the job object is not set, then it is assumed that the middleware container is to be used.
0810 
0811     :param cmd: command to be prepended (string).
0812     :param workdir: explicit work directory where the command should be executed (needs to be set for Singularity) (string).
0813     :param job: job object.
0814     :return: prepended command with singularity execution command (string).
0815     """
0816 
0817     if job:
0818         queuedata = job.infosys.queuedata
0819     else:
0820         infoservice = InfoService()
0821         infoservice.init(os.environ.get('PILOT_SITENAME'), infosys.confinfo, infosys.extinfo)
0822         queuedata = infoservice.queuedata
0823 
0824     container_name = queuedata.container_type.get("pilot")  # resolve container name for user=pilot
0825     logger.debug("resolved container_name from queuedata.container_type: %s", container_name)
0826 
0827     if container_name == 'singularity':
0828         logger.info("singularity has been requested")
0829 
0830         # Get the singularity options
0831         singularity_options = queuedata.container_options
0832         if singularity_options != "":
0833             singularity_options += ","
0834         else:
0835             singularity_options = "-B "
0836         singularity_options += "/cvmfs,${workdir},/home"
0837         logger.debug("using singularity_options: %s", singularity_options)
0838 
0839         # Get the image path
0840         if job:
0841             image_path = job.imagename or get_grid_image_for_singularity(job.platform)
0842         else:
0843             image_path = config.Container.middleware_container
0844 
0845         # Does the image exist?
0846         if image_path:
0847             # Prepend it to the given command
0848             cmd = "export workdir=" + workdir + "; singularity --verbose exec " + singularity_options + " " + image_path + \
0849                   " /bin/bash -c " + pipes.quote("cd $workdir;pwd;%s" % cmd)
0850 
0851             # for testing user containers
0852             # singularity_options = "-B $PWD:/data --pwd / "
0853             # singularity_cmd = "singularity exec " + singularity_options + image_path
0854             # cmd = re.sub(r'-p "([A-Za-z0-9.%/]+)"', r'-p "%s\1"' % urllib.pathname2url(singularity_cmd), cmd)
0855         else:
0856             logger.warning("singularity options found but image does not exist")
0857 
0858         logger.info("updated command: %s", cmd)
0859 
0860     return cmd
0861 
0862 
0863 def create_root_container_command(workdir, cmd):
0864     """
0865 
0866     :param workdir:
0867     :param cmd:
0868     :return:
0869     """
0870 
0871     command = 'cd %s;' % workdir
0872     content = get_root_container_script(cmd)
0873     script_name = 'open_file.sh'
0874 
0875     try:
0876         status = write_file(os.path.join(workdir, script_name), content)
0877     except PilotException as exc:
0878         raise exc
0879     else:
0880         if status:
0881             # generate the final container command
0882             x509 = os.environ.get('X509_USER_PROXY', '')
0883             if x509:
0884                 command += 'export X509_USER_PROXY=%s;' % x509
0885             command += 'export ALRB_CONT_RUNPAYLOAD=\"source /srv/%s\";' % script_name
0886             command += get_asetup(alrb=True)  # export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;
0887             command += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c CentOS7'
0888 
0889     logger.debug('container command: %s', command)
0890 
0891     return command
0892 
0893 
0894 def create_middleware_container_command(workdir, cmd, container_options, label='stagein', proxy=True):
0895     """
0896     Create the stage-in/out container command.
0897 
0898     The function takes the isolated stage-in/out command, adds bits and pieces needed for the containerisation and stores
0899     it in a stage[in|out].sh script file. It then generates the actual command that will execute the stage-in/out script in a
0900     container.
0901 
0902     new cmd:
0903       lsetup rucio davis xrootd
0904       old cmd
0905       exit $?
0906     write new cmd to stage[in|out].sh script
0907     create container command and return it
0908 
0909     :param workdir: working directory where script will be stored (string).
0910     :param cmd: isolated stage-in/out command (string).
0911     :param container_options: container options from queuedata (string).
0912     :param label: 'stage-[in|out]' (string).
0913     :return: container command to be executed (string).
0914     """
0915 
0916     command = 'cd %s;' % workdir
0917 
0918     # add bits and pieces for the containerisation
0919     middleware_container = get_middleware_container(label=label)
0920     content = get_middleware_container_script(middleware_container, cmd, label=label)
0921     # store it in setup.sh
0922     if label == 'stage-in':
0923         script_name = 'stagein.sh'
0924     elif label == 'stage-out':
0925         script_name = 'stageout.sh'
0926     else:
0927         script_name = 'general.sh'
0928 
0929     try:
0930         status = write_file(os.path.join(workdir, script_name), content)
0931     except PilotException as exc:
0932         raise exc
0933     else:
0934         if status:
0935             # generate the final container command
0936             if proxy:
0937                 x509 = os.environ.get('X509_USER_PROXY', '')
0938                 if x509:
0939                     command += 'export X509_USER_PROXY=%s;' % x509
0940             command += 'export ALRB_CONT_RUNPAYLOAD=\"source /srv/%s\";' % script_name
0941             if 'ALRB_CONT_UNPACKEDDIR' in os.environ:
0942                 command += 'export ALRB_CONT_UNPACKEDDIR=%s;' % os.environ.get('ALRB_CONT_UNPACKEDDIR')
0943             command += get_asetup(alrb=True)  # export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;
0944             command += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s' % middleware_container
0945             command += ' ' + get_container_options(container_options)
0946             command = command.replace('  ', ' ')
0947 
0948     logger.debug('container command: %s', command)
0949 
0950     return command
0951 
0952 
0953 def get_root_container_script(cmd):
0954     """
0955     Return the content of the root container script.
0956 
0957     :param cmd: root command (string).
0958     :return: script content (string).
0959     """
0960 
0961     # content = 'lsetup \'root 6.20.06-x86_64-centos7-gcc8-opt\'\npython %s\nexit $?' % cmd
0962     content = 'date\nlsetup \'root pilot\'\ndate\npython %s\nexit $?' % cmd
0963     logger.debug('root setup script content:\n\n%s\n\n', content)
0964 
0965     return content
0966 
0967 
0968 def get_middleware_container_script(middleware_container, cmd, asetup=False, label=''):
0969     """
0970     Return the content of the middleware container script.
0971     If asetup is True, atlasLocalSetup will be added to the command.
0972 
0973     :param middleware_container: container image (string).
0974     :param cmd: isolated stage-in/out command (string).
0975     :param asetup: optional True/False (boolean).
0976     :return: script content (string).
0977     """
0978 
0979     sitename = 'export PILOT_RUCIO_SITENAME=%s; ' % os.environ.get('PILOT_RUCIO_SITENAME')
0980     if 'rucio' in middleware_container:
0981         content = sitename + 'python3 %s ' % cmd  # only works with python 3
0982     else:
0983         content = ''
0984         if is_python3():
0985             content += 'export ALRB_LOCAL_PY3=YES; '
0986         if asetup:  # export ATLAS_LOCAL_ROOT_BASE=/cvmfs/..;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
0987             content += get_asetup(asetup=False)
0988         if label == 'stagein' or label == 'stageout':
0989             content += sitename + 'lsetup rucio davix xrootd; '
0990             content += 'python3 %s ' % cmd if is_python3() else 'python %s' % cmd
0991         else:
0992             content += cmd
0993     if not asetup:
0994         content += '\nexit $?'
0995 
0996     logger.debug('middleware container content:\n%s', content)
0997 
0998     return content
0999 
1000 
1001 def get_middleware_container(label=None):
1002     """
1003     Return the middleware container.
1004 
1005     :param label: label (string).
1006     :return: path (string).
1007     """
1008 
1009     if label and label == 'general':
1010         return 'CentOS7'
1011 
1012     if 'ALRB_CONT_UNPACKEDDIR' in os.environ:
1013         path = config.Container.middleware_container_no_path
1014     else:
1015         path = config.Container.middleware_container
1016         if path.startswith('/') and not os.path.exists(path):
1017             logger.warning('requested middleware container path does not exist: %s (switching to default value)', path)
1018             path = 'CentOS7'
1019     logger.info('using image: %s for middleware container', path)
1020 
1021     return path