File indexing completed on 2026-04-11 08:41:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import os
0012 import pipes
0013 import re
0014 import logging
0015 import traceback
0016
0017
0018
0019 from pilot.common.errorcodes import ErrorCodes
0020 from pilot.common.exception import PilotException, FileHandlingFailure
0021 from pilot.user.atlas.setup import get_asetup, get_file_system_root_path
0022 from pilot.user.atlas.proxy import verify_proxy
0023 from pilot.info import InfoService, infosys
0024 from pilot.util.auxiliary import is_python3
0025 from pilot.util.config import config
0026 from pilot.util.filehandling import write_file
0027 from pilot.util import https
0028
0029 logger = logging.getLogger(__name__)
0030 errors = ErrorCodes()
0031
0032
0033 def get_payload_proxy(proxy_outfile_name, voms_role='atlas'):
0034 """
0035 :param proxy_outfile_name: specify the file to store proxy
0036 :param voms_role: what proxy (role) to request. It should exist on Panda node
0037 :return: True on success
0038 """
0039 try:
0040
0041 url = os.environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver)
0042 res = https.request('{pandaserver}/server/panda/getProxy'.format(pandaserver=url), data={'role': voms_role})
0043
0044 if res is None:
0045 logger.error("Unable to get proxy with role '%s' from panda server", voms_role)
0046 return False
0047
0048 if res['StatusCode'] != 0:
0049 logger.error("When get proxy with role '%s' panda server returned: %s", voms_role, res['errorDialog'])
0050 return False
0051
0052 proxy_contents = res['userProxy']
0053
0054 except Exception as exc:
0055 logger.error("Get proxy from panda server failed: %s, %s", exc, traceback.format_exc())
0056 return False
0057
0058 res = False
0059 try:
0060
0061
0062 _file = os.open(proxy_outfile_name, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
0063 os.close(_file)
0064 res = write_file(proxy_outfile_name, proxy_contents, mute=False)
0065 except (IOError, OSError, FileHandlingFailure) as exc:
0066 logger.error("Exception when try to save proxy to the file '%s': %s, %s",
0067 proxy_outfile_name, exc, traceback.format_exc())
0068
0069 return res
0070
0071
0072 def do_use_container(**kwargs):
0073 """
0074 Decide whether to use a container or not.
0075
0076 :param kwargs: dictionary of key-word arguments.
0077 :return: True if function has decided that a container should be used, False otherwise (boolean).
0078 """
0079
0080
0081 use_container = False
0082
0083 job = kwargs.get('job', False)
0084 copytool = kwargs.get('copytool', False)
0085 if job:
0086
0087 if job.imagename and job.imagename != 'NULL':
0088 use_container = True
0089 logger.debug('job.imagename set -> use_container = True')
0090 elif not (job.platform or job.alrbuserplatform):
0091 use_container = False
0092 logger.debug('not (job.platform or job.alrbuserplatform) -> use_container = False')
0093 else:
0094 queuedata = job.infosys.queuedata
0095 container_name = queuedata.container_type.get("pilot")
0096 if container_name:
0097 use_container = True
0098 logger.debug('container_name == \'%s\' -> use_container = True', container_name)
0099 else:
0100 logger.debug('else -> use_container = False')
0101 elif copytool:
0102
0103 use_container = True
0104 logger.debug('copytool -> use_container = False')
0105 else:
0106 logger.debug('not job -> use_container = False')
0107
0108 return use_container
0109
0110
0111 def wrapper(executable, **kwargs):
0112 """
0113 Wrapper function for any container specific usage.
0114 This function will be called by pilot.util.container.execute() and prepends the executable with a container command.
0115
0116 :param executable: command to be executed (string).
0117 :param kwargs: dictionary of key-word arguments.
0118 :return: executable wrapped with container command (string).
0119 """
0120
0121 workdir = kwargs.get('workdir', '.')
0122 pilot_home = os.environ.get('PILOT_HOME', '')
0123 job = kwargs.get('job', None)
0124
0125 logger.info('container wrapper called')
0126
0127 if workdir == '.' and pilot_home != '':
0128 workdir = pilot_home
0129
0130
0131 if config.Container.setup_type == "ALRB":
0132 fctn = alrb_wrapper
0133 else:
0134 fctn = singularity_wrapper
0135 return fctn(executable, workdir, job=job)
0136
0137
0138 def extract_platform_and_os(platform):
0139 """
0140 Extract the platform and OS substring from platform
0141
0142 :param platform (string): E.g. "x86_64-slc6-gcc48-opt"
0143 :return: extracted platform specifics (string). E.g. "x86_64-slc6". In case of failure, return the full platform
0144 """
0145
0146 pattern = r"([A-Za-z0-9_-]+)-.+-.+"
0147 found = re.findall(re.compile(pattern), platform)
0148
0149 if found:
0150 ret = found[0]
0151 else:
0152 logger.warning("could not extract architecture and OS substring using pattern=%s from platform=%s"
0153 "(will use %s for image name)", pattern, platform, platform)
0154 ret = platform
0155
0156 return ret
0157
0158
0159 def get_grid_image_for_singularity(platform):
0160 """
0161 Return the full path to the singularity grid image
0162
0163 :param platform: E.g. "x86_64-slc6" (string).
0164 :return: full path to grid image (string).
0165 """
0166
0167 if not platform or platform == "":
0168 platform = "x86_64-slc6"
0169 logger.warning("using default platform=%s (cmtconfig not set)", platform)
0170
0171 arch_and_os = extract_platform_and_os(platform)
0172 image = arch_and_os + ".img"
0173 _path = os.path.join(get_file_system_root_path(), "atlas.cern.ch/repo/containers/images/singularity")
0174 path = os.path.join(_path, image)
0175 if not os.path.exists(path):
0176 image = 'x86_64-centos7.img'
0177 logger.warning('path does not exist: %s (trying with image %s instead)', path, image)
0178 path = os.path.join(_path, image)
0179 if not os.path.exists(path):
0180 logger.warning('path does not exist either: %s', path)
0181 path = ""
0182
0183 return path
0184
0185
0186 def get_middleware_type():
0187 """
0188 Return the middleware type from the container type.
0189 E.g. container_type = 'singularity:pilot;docker:wrapper;container:middleware'
0190 get_middleware_type() -> 'container', meaning that middleware should be taken from the container. The default
0191 is otherwise 'workernode', i.e. middleware is assumed to be present on the worker node.
0192
0193 :return: middleware_type (string)
0194 """
0195
0196 middleware_type = ""
0197 container_type = infosys.queuedata.container_type
0198
0199 middleware = 'middleware'
0200 if container_type and container_type != "" and middleware in container_type:
0201 try:
0202 container_names = container_type.split(';')
0203 for name in container_names:
0204 _split = name.split(':')
0205 if middleware == _split[0]:
0206 middleware_type = _split[1]
0207 except IndexError as exc:
0208 logger.warning("failed to parse the container name: %s, %s", container_type, exc)
0209 else:
0210
0211
0212 middleware_type = "workernode"
0213
0214 return middleware_type
0215
0216
0217 def extract_atlas_setup(asetup, swrelease):
0218 """
0219 Extract the asetup command from the full setup command for jobs that have a defined release.
0220 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;
0221 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;source $AtlasSetup/scripts/asetup.sh
0222 -> $AtlasSetup/scripts/asetup.sh, export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase; source
0223 ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
0224
0225 :param asetup: full asetup command (string).
0226 :param swrelease: ATLAS release (string).
0227 :return: extracted asetup command, cleaned up full asetup command without asetup.sh (string).
0228 """
0229
0230 if not swrelease:
0231 return '', ''
0232
0233 try:
0234
0235 atlas_setup = asetup.split(';')[-1]
0236
0237
0238 cleaned_atlas_setup = asetup.replace(atlas_setup, '')
0239 atlas_setup = atlas_setup.replace('source ', '')
0240 except AttributeError as exc:
0241 logger.debug('exception caught while extracting asetup command: %s', exc)
0242 atlas_setup = ''
0243 cleaned_atlas_setup = ''
0244
0245 return atlas_setup, cleaned_atlas_setup
0246
0247
0248 def extract_full_atlas_setup(cmd, atlas_setup):
0249 """
0250 Extract the full asetup (including options) from the payload setup command.
0251 atlas_setup is typically '$AtlasSetup/scripts/asetup.sh'.
0252
0253 :param cmd: full payload setup command (string).
0254 :param atlas_setup: asetup command (string).
0255 :return: extracted full asetup command, updated full payload setup command without asetup part (string).
0256 """
0257
0258 updated_cmds = []
0259 extracted_asetup = ""
0260
0261 if not atlas_setup:
0262 return extracted_asetup, cmd
0263
0264 try:
0265 _cmd = cmd.split(';')
0266 for subcmd in _cmd:
0267 if atlas_setup in subcmd:
0268 extracted_asetup = subcmd
0269 else:
0270 updated_cmds.append(subcmd)
0271 updated_cmd = ';'.join(updated_cmds)
0272 except AttributeError as exc:
0273 logger.warning('exception caught while extracting full atlas setup: %s', exc)
0274 updated_cmd = cmd
0275 logger.debug('updated payload setup command: %s', updated_cmd)
0276
0277 return extracted_asetup, updated_cmd
0278
0279
0280 def update_alrb_setup(cmd, use_release_setup):
0281 """
0282 Update the ALRB setup command.
0283 Add the ALRB_CONT_SETUPFILE in case the release setup file was created earlier (required available cvmfs).
0284
0285 :param cmd: full ALRB setup command (string).
0286 :param use_release_setup: should the release setup file be added to the setup command? (Boolean).
0287 :return: updated ALRB setup command (string).
0288 """
0289
0290 updated_cmds = []
0291 try:
0292 _cmd = cmd.split(';')
0293 for subcmd in _cmd:
0294 if subcmd.startswith('source ${ATLAS_LOCAL_ROOT_BASE}') and use_release_setup:
0295 updated_cmds.append('export ALRB_CONT_SETUPFILE="/srv/%s"' % config.Container.release_setup)
0296 updated_cmds.append(subcmd)
0297 updated_cmd = ';'.join(updated_cmds)
0298 except AttributeError as exc:
0299 logger.warning('exception caught while extracting full atlas setup: %s', exc)
0300 updated_cmd = cmd
0301 logger.debug('updated ALRB command: %s', updated_cmd)
0302
0303 return updated_cmd
0304
0305
0306 def update_for_user_proxy(_cmd, cmd, is_analysis=False):
0307 """
0308 Add the X509 user proxy to the container sub command string if set, and remove it from the main container command.
0309 Try to receive payload proxy and update X509_USER_PROXY in container setup command
0310 In case payload proxy from server is required, this function will also download and verify this proxy.
0311
0312 :param _cmd: container setup command (string).
0313 :param cmd: command the container will execute (string).
0314 :param is_analysis: True for user job (Boolean).
0315 :return: exit_code (int), diagnostics (string), updated _cmd (string), updated cmd (string).
0316 """
0317
0318 exit_code = 0
0319 diagnostics = ""
0320
0321 x509 = os.environ.get('X509_USER_PROXY', '')
0322 if x509 != "":
0323
0324 cmd = cmd.replace("export X509_USER_PROXY=%s;" % x509, '')
0325
0326
0327
0328 proxy_verification = os.environ.get('PILOT_PROXY_VERIFICATION') == 'True' and os.environ.get('PILOT_PAYLOAD_PROXY_VERIFICATION') == 'True'
0329 if proxy_verification and config.Pilot.payload_proxy_from_server and is_analysis:
0330 exit_code, diagnostics, x509 = get_and_verify_payload_proxy_from_server(x509)
0331 if exit_code != 0:
0332 logger.warning('payload proxy verification failed')
0333
0334
0335 _cmd = "export X509_USER_PROXY=%s;" % x509 + _cmd
0336
0337 return exit_code, diagnostics, _cmd, cmd
0338
0339
0340 def get_and_verify_payload_proxy_from_server(x509):
0341 """
0342 Download a payload proxy from the server and verify it.
0343
0344 :param x509: X509_USER_PROXY (string).
0345 :return: exit code (int), diagnostics (string), updated X509_USER_PROXY (string).
0346 """
0347
0348 exit_code = 0
0349 diagnostics = ""
0350
0351
0352 x509_payload = re.sub('.proxy$', '', x509) + '-payload.proxy'
0353
0354
0355 logger.info("download payload proxy from server")
0356 if get_payload_proxy(x509_payload):
0357 logger.info("server returned payload proxy (verifying)")
0358 exit_code, diagnostics = verify_proxy(x509=x509_payload, proxy_id=None)
0359
0360 if exit_code != 0 or (exit_code == 0 and diagnostics != ''):
0361 logger.warning(diagnostics)
0362 logger.info("payload proxy verification failed")
0363 else:
0364 logger.info("payload proxy verified")
0365
0366
0367 x509 = x509_payload
0368 else:
0369 logger.warning("get_payload_proxy() failed")
0370
0371 return exit_code, diagnostics, x509
0372
0373
0374 def set_platform(job, alrb_setup):
0375 """
0376 Set thePlatform variable and add it to the sub container command.
0377
0378 :param job: job object.
0379 :param alrb_setup: ALRB setup (string).
0380 :return: updated ALRB setup (string).
0381 """
0382
0383 if job.alrbuserplatform:
0384 alrb_setup += 'export thePlatform=\"%s\";' % job.alrbuserplatform
0385 elif job.preprocess and job.containeroptions:
0386 alrb_setup += 'export thePlatform=\"%s\";' % job.containeroptions.get('containerImage')
0387 elif job.imagename:
0388 alrb_setup += 'export thePlatform=\"%s\";' % job.imagename
0389 elif job.platform:
0390 alrb_setup += 'export thePlatform=\"%s\";' % job.platform
0391
0392 return alrb_setup
0393
0394
0395 def get_container_options(container_options):
0396 """
0397 Get the container options from AGIS for the container execution command.
0398 For Raythena ES jobs, replace the -C with "" (otherwise IPC does not work, needed by yampl).
0399
0400 :param container_options: container options from AGIS (string).
0401 :return: updated container command (string).
0402 """
0403
0404 is_raythena = os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena'
0405
0406 opts = ''
0407
0408 if container_options:
0409
0410 if is_raythena:
0411 if '-C' in container_options:
0412 container_options = container_options.replace('-C', '')
0413 if '--containall' in container_options:
0414 container_options = container_options.replace('--containall', '')
0415 if container_options:
0416 opts += '-e \"%s\"' % container_options
0417 else:
0418
0419
0420
0421 if is_raythena:
0422 pass
0423
0424 else:
0425 opts += '-e \"-C\"'
0426
0427 return opts
0428
0429
0430 def alrb_wrapper(cmd, workdir, job=None):
0431 """
0432 Wrap the given command with the special ALRB setup for containers
0433 E.g. cmd = /bin/bash hello_world.sh
0434 ->
0435 export thePlatform="x86_64-slc6-gcc48-opt"
0436 export ALRB_CONT_RUNPAYLOAD="cmd'
0437 setupATLAS -c $thePlatform
0438
0439 :param cmd (string): command to be executed in a container.
0440 :param workdir: (not used)
0441 :param job: job object.
0442 :return: prepended command with singularity execution command (string).
0443 """
0444
0445 if not job:
0446 logger.warning('the ALRB wrapper did not get a job object - cannot proceed')
0447 return cmd
0448
0449 queuedata = job.infosys.queuedata
0450 container_name = queuedata.container_type.get("pilot")
0451 if container_name:
0452
0453 _asetup = get_asetup()
0454
0455
0456
0457
0458
0459
0460 atlas_setup, clean_asetup = extract_atlas_setup(_asetup, job.swrelease)
0461 full_atlas_setup = get_full_asetup(cmd, 'source ' + atlas_setup) if atlas_setup and clean_asetup else ''
0462
0463
0464 if clean_asetup and full_atlas_setup:
0465 cmd = cmd.replace(clean_asetup, '')
0466
0467 if job.imagename:
0468 cmd = cmd.replace(full_atlas_setup, '')
0469
0470
0471
0472
0473
0474 alrb_setup = get_asetup(alrb=True, add_if=True)
0475
0476
0477
0478
0479
0480
0481 exit_code, diagnostics, alrb_setup, cmd = update_for_user_proxy(alrb_setup, cmd, is_analysis=job.is_analysis())
0482 if exit_code:
0483 job.piloterrordiag = diagnostics
0484 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
0485
0486 alrb_setup = set_platform(job, alrb_setup)
0487
0488
0489
0490 if 'export PandaID' not in alrb_setup:
0491 alrb_setup += "export PandaID=%s;" % job.jobid
0492
0493
0494 cmd = "export TMPDIR=/srv;export GFORTRAN_TMPDIR=/srv;" + cmd
0495 cmd = cmd.replace(';;', ';')
0496
0497
0498 release_setup, cmd = create_release_setup(cmd, atlas_setup, full_atlas_setup, job.swrelease, job.imagename,
0499 job.workdir, queuedata.is_cvmfs)
0500 if not cmd:
0501 diagnostics = 'payload setup was reset due to missing release setup in unpacked container'
0502 logger.warning(diagnostics)
0503 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGRELEASEUNPACKED)
0504 return ""
0505
0506
0507 if job.preprocess and job.containeroptions:
0508 cmd = replace_last_command(cmd, job.containeroptions.get('containerExec'))
0509 logger.debug('updated cmd with containerExec: %s', cmd)
0510
0511
0512 container_script = config.Container.container_script
0513 logger.debug('command to be written to container script file:\n\n%s:\n\n%s\n', container_script, cmd)
0514 try:
0515 write_file(os.path.join(job.workdir, container_script), cmd, mute=False)
0516 os.chmod(os.path.join(job.workdir, container_script), 0o755)
0517
0518 except (FileHandlingFailure, OSError) as exc:
0519 logger.warning('exception caught: %s', exc)
0520 return ""
0521
0522
0523 job.command = cmd
0524
0525
0526 cmd = add_asetup(job, alrb_setup, queuedata.is_cvmfs, release_setup, container_script, queuedata.container_options)
0527
0528
0529 execargs = job.containeroptions.get('execArgs', None)
0530 if execargs:
0531 cmd += ' ' + execargs
0532 logger.debug('\n\nfinal command:\n\n%s\n', cmd)
0533 else:
0534 logger.warning('container name not defined in CRIC')
0535
0536 return cmd
0537
0538
0539 def is_release_setup(script, imagename):
0540 """
0541 Does the release_setup.sh file exist?
0542 This check can only be made for unpacked containers. These must have the release setup file present, or setup will
0543 fail. For non-unpacked containers, the function will return True and the pilot will assume that the container has
0544 the setup file.
0545
0546 :param script: release setup script (string).
0547 :param imagename: container/image name (string).
0548 :return: Boolean.
0549 """
0550
0551 if 'unpacked' in imagename:
0552 if script.startswith('/'):
0553 script = script[1:]
0554 exists = True if os.path.exists(os.path.join(imagename, script)) else False
0555 if exists:
0556 logger.info('%s is present in %s', script, imagename)
0557 else:
0558 logger.warning('%s is not present in %s - setup has failed', script, imagename)
0559 else:
0560 exists = True
0561 logger.info('%s is assumed to be present in %s', script, imagename)
0562 return exists
0563
0564
0565 def add_asetup(job, alrb_setup, is_cvmfs, release_setup, container_script, container_options):
0566 """
0567 Add atlasLocalSetup and options to form the final payload command.
0568
0569 :param job: job object.
0570 :param alrb_setup: ALRB setup (string).
0571 :param is_cvmfs: True for cvmfs sites (Boolean).
0572 :param release_setup: release setup (string).
0573 :param container_script: container script name (string).
0574 :param container_options: container options (string).
0575 :return: final payload command (string).
0576 """
0577
0578
0579
0580 if '--containerImage' in job.jobparams:
0581 job.jobparams, container_path = remove_container_string(job.jobparams)
0582 if job.alrbuserplatform:
0583 if not is_cvmfs:
0584 alrb_setup += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s' % job.alrbuserplatform
0585 elif container_path != "":
0586 alrb_setup += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s' % container_path
0587 else:
0588 logger.warning('failed to extract container path from %s', job.jobparams)
0589 alrb_setup = ""
0590 if alrb_setup and not is_cvmfs:
0591 alrb_setup += ' -d'
0592 else:
0593 alrb_setup += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh '
0594 if job.platform or job.alrbuserplatform or job.imagename:
0595 alrb_setup += '-c $thePlatform'
0596 if not is_cvmfs:
0597 alrb_setup += ' -d'
0598
0599
0600 alrb_setup += ' -s %s' % release_setup
0601 alrb_setup += ' -r /srv/' + container_script
0602 alrb_setup = alrb_setup.replace(' ', ' ').replace(';;', ';')
0603
0604
0605 alrb_setup += ' ' + get_container_options(container_options)
0606 alrb_setup = alrb_setup.replace(' ', ' ')
0607 cmd = alrb_setup
0608
0609
0610
0611
0612
0613
0614
0615 return cmd
0616
0617
0618 def get_full_asetup(cmd, atlas_setup):
0619 """
0620 Extract the full asetup command from the payload execution command.
0621 (Easier that generating it again). We need to remove this command for stand-alone containers.
0622 Alternatively: do not include it in the first place (but this seems to trigger the need for further changes).
0623 atlas_setup is "source $AtlasSetup/scripts/asetup.sh", which is extracted in a previous step.
0624 The function typically returns: "source $AtlasSetup/scripts/asetup.sh 21.0,Athena,2020-05-19T2148,notest --makeflags='$MAKEFLAGS';".
0625
0626 :param cmd: payload execution command (string).
0627 :param atlas_setup: extracted atlas setup (string).
0628 :return: full atlas setup (string).
0629 """
0630
0631 pos = cmd.find(atlas_setup)
0632 cmd = cmd[pos:]
0633 pos = cmd.find(';')
0634 cmd = cmd[:pos + 1]
0635
0636 return cmd
0637
0638
0639 def replace_last_command(cmd, replacement):
0640 """
0641 Replace the last command in cmd with given replacement.
0642
0643 :param cmd: command (string).
0644 :param replacement: replacement (string).
0645 :return: updated command (string).
0646 """
0647
0648 cmd = cmd.strip('; ')
0649 last_bit = cmd.split(';')[-1]
0650 cmd = cmd.replace(last_bit.strip(), replacement)
0651
0652 return cmd
0653
0654
0655 def create_release_setup(cmd, atlas_setup, full_atlas_setup, release, imagename, workdir, is_cvmfs):
0656 """
0657 Get the proper release setup script name, and create the script if necessary.
0658
0659 This function also updates the cmd string (removes full asetup from payload command).
0660
0661 Note: for stand-alone containers, the function will return /release_setup.sh and assume that this script exists
0662 in the container. The pilot will only create a my_release_setup.sh script for OS containers.
0663
0664 In case the release setup is not present in an unpacked container, the function will reset the cmd string.
0665
0666 :param cmd: Payload execution command (string).
0667 :param atlas_setup: asetup command (string).
0668 :param full_atlas_setup: full asetup command (string).
0669 :param release: software release, needed to determine Athena environment (string).
0670 :param imagename: container image name (string).
0671 :param workdir: job workdir (string).
0672 :param is_cvmfs: does the queue have cvmfs? (Boolean).
0673 :return: proper release setup name (string), updated cmd (string).
0674 """
0675
0676 release_setup_name = '/srv/my_release_setup.sh'
0677
0678
0679 content = 'echo \"INFO: sourcing %s inside the container. ' \
0680 'This should not run if it is a ATLAS standalone container\"' % release_setup_name
0681 if is_cvmfs and release and release != 'NULL':
0682 content, cmd = extract_full_atlas_setup(cmd, atlas_setup)
0683 if not content:
0684 content = full_atlas_setup
0685
0686 content += '\nreturn $?'
0687 logger.debug('command to be written to release setup file:\n\n%s:\n\n%s\n', release_setup_name, content)
0688 try:
0689 write_file(os.path.join(workdir, os.path.basename(release_setup_name)), content, mute=False)
0690 except FileHandlingFailure as exc:
0691 logger.warning('exception caught: %s', exc)
0692
0693
0694 if imagename and release and release != 'NULL':
0695 cmd = cmd.replace(';;', ';') if is_release_setup(release_setup_name, imagename) else ''
0696
0697 return release_setup_name, cmd
0698
0699
0700 def create_release_setup_old(cmd, atlas_setup, full_atlas_setup, release, imagename, workdir, is_cvmfs):
0701 """
0702 Get the proper release setup script name, and create the script if necessary.
0703
0704 This function also updates the cmd string (removes full asetup from payload command).
0705
0706 Note: for stand-alone containers, the function will return /release_setup.sh and assume that this script exists
0707 in the container. The pilot will only create a my_release_setup.sh script for OS containers.
0708
0709 In case the release setup is not present in an unpacked container, the function will reset the cmd string.
0710
0711 :param cmd: Payload execution command (string).
0712 :param atlas_setup: asetup command (string).
0713 :param full_atlas_setup: full asetup command (string).
0714 :param release: software release, needed to determine Athena environment (string).
0715 :param imagename: container image name (string).
0716 :param workdir: job workdir (string).
0717 :param is_cvmfs: does the queue have cvmfs? (Boolean).
0718 :return: proper release setup name (string), updated cmd (string).
0719 """
0720
0721 release_setup_name = get_release_setup_name(release, imagename)
0722
0723
0724 if not release_setup_name.startswith('/'):
0725
0726 content = 'echo \"INFO: sourcing %s inside the container. ' \
0727 'This should not run if it is a ATLAS standalone container\"' % release_setup_name
0728 if is_cvmfs:
0729 content, cmd = extract_full_atlas_setup(cmd, atlas_setup)
0730 if not content:
0731 content = full_atlas_setup
0732 if not content:
0733 logger.debug(
0734 'will create an empty (almost) release setup file since asetup could not be extracted from command')
0735 logger.debug('command to be written to release setup file:\n\n%s:\n\n%s\n', release_setup_name, content)
0736 try:
0737 write_file(os.path.join(workdir, release_setup_name), content, mute=False)
0738 except FileHandlingFailure as exc:
0739 logger.warning('exception caught: %s', exc)
0740 else:
0741
0742 cmd = cmd.replace(';;', ';') if is_release_setup(release_setup_name, imagename) else ''
0743
0744
0745 if not release_setup_name.startswith('/'):
0746 release_setup_name = os.path.join('/srv', release_setup_name)
0747
0748 return release_setup_name, cmd
0749
0750
0751 def get_release_setup_name(release, imagename):
0752 """
0753 Return the file name for the release setup script.
0754
0755 NOTE: the /srv path will only be added later, in the case of OS containers.
0756
0757 For OS containers, return config.Container.release_setup (my_release_setup.sh);
0758 for stand-alone containers (user defined containers, ie when --containerImage or job.imagename was used/set),
0759 return '/release_setup.sh'. release_setup.sh will NOT be created for stand-alone containers.
0760 The pilot will specify /release_setup.sh only when jobs use the Athena environment (ie has a set job.swrelease).
0761
0762 :param release: software release (string).
0763 :param imagename: container image name (string).
0764 :return: release setup file name (string).
0765 """
0766
0767 if imagename and release and release != 'NULL':
0768
0769 release_setup_name = '/srv/my_release_setup.sh'
0770
0771
0772 else:
0773
0774 release_setup_name = config.Container.release_setup
0775 if not release_setup_name:
0776 release_setup_name = 'my_release_setup.sh'
0777
0778
0779
0780 return release_setup_name
0781
0782
0783
0784 def remove_container_string(job_params):
0785 """ Retrieve the container string from the job parameters """
0786
0787 pattern = r" \'?\-\-containerImage\=?\ ?([\S]+)\ ?\'?"
0788 compiled_pattern = re.compile(pattern)
0789
0790
0791 job_params = re.sub(r'\'\ \'', ' ', job_params)
0792
0793
0794 found = re.findall(compiled_pattern, job_params)
0795 container_path = found[0] if found else ""
0796
0797
0798 job_params = re.sub(pattern, ' ', job_params)
0799
0800 return job_params, container_path
0801
0802
0803 def singularity_wrapper(cmd, workdir, job=None):
0804 """
0805 Prepend the given command with the singularity execution command
0806 E.g. cmd = /bin/bash hello_world.sh
0807 -> singularity_command = singularity exec -B <bindmountsfromcatchall> <img> /bin/bash hello_world.sh
0808 singularity exec -B <bindmountsfromcatchall> /cvmfs/atlas.cern.ch/repo/images/singularity/x86_64-slc6.img <script>
0809 Note: if the job object is not set, then it is assumed that the middleware container is to be used.
0810
0811 :param cmd: command to be prepended (string).
0812 :param workdir: explicit work directory where the command should be executed (needs to be set for Singularity) (string).
0813 :param job: job object.
0814 :return: prepended command with singularity execution command (string).
0815 """
0816
0817 if job:
0818 queuedata = job.infosys.queuedata
0819 else:
0820 infoservice = InfoService()
0821 infoservice.init(os.environ.get('PILOT_SITENAME'), infosys.confinfo, infosys.extinfo)
0822 queuedata = infoservice.queuedata
0823
0824 container_name = queuedata.container_type.get("pilot")
0825 logger.debug("resolved container_name from queuedata.container_type: %s", container_name)
0826
0827 if container_name == 'singularity':
0828 logger.info("singularity has been requested")
0829
0830
0831 singularity_options = queuedata.container_options
0832 if singularity_options != "":
0833 singularity_options += ","
0834 else:
0835 singularity_options = "-B "
0836 singularity_options += "/cvmfs,${workdir},/home"
0837 logger.debug("using singularity_options: %s", singularity_options)
0838
0839
0840 if job:
0841 image_path = job.imagename or get_grid_image_for_singularity(job.platform)
0842 else:
0843 image_path = config.Container.middleware_container
0844
0845
0846 if image_path:
0847
0848 cmd = "export workdir=" + workdir + "; singularity --verbose exec " + singularity_options + " " + image_path + \
0849 " /bin/bash -c " + pipes.quote("cd $workdir;pwd;%s" % cmd)
0850
0851
0852
0853
0854
0855 else:
0856 logger.warning("singularity options found but image does not exist")
0857
0858 logger.info("updated command: %s", cmd)
0859
0860 return cmd
0861
0862
0863 def create_root_container_command(workdir, cmd):
0864 """
0865
0866 :param workdir:
0867 :param cmd:
0868 :return:
0869 """
0870
0871 command = 'cd %s;' % workdir
0872 content = get_root_container_script(cmd)
0873 script_name = 'open_file.sh'
0874
0875 try:
0876 status = write_file(os.path.join(workdir, script_name), content)
0877 except PilotException as exc:
0878 raise exc
0879 else:
0880 if status:
0881
0882 x509 = os.environ.get('X509_USER_PROXY', '')
0883 if x509:
0884 command += 'export X509_USER_PROXY=%s;' % x509
0885 command += 'export ALRB_CONT_RUNPAYLOAD=\"source /srv/%s\";' % script_name
0886 command += get_asetup(alrb=True)
0887 command += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c CentOS7'
0888
0889 logger.debug('container command: %s', command)
0890
0891 return command
0892
0893
0894 def create_middleware_container_command(workdir, cmd, container_options, label='stagein', proxy=True):
0895 """
0896 Create the stage-in/out container command.
0897
0898 The function takes the isolated stage-in/out command, adds bits and pieces needed for the containerisation and stores
0899 it in a stage[in|out].sh script file. It then generates the actual command that will execute the stage-in/out script in a
0900 container.
0901
0902 new cmd:
0903 lsetup rucio davis xrootd
0904 old cmd
0905 exit $?
0906 write new cmd to stage[in|out].sh script
0907 create container command and return it
0908
0909 :param workdir: working directory where script will be stored (string).
0910 :param cmd: isolated stage-in/out command (string).
0911 :param container_options: container options from queuedata (string).
0912 :param label: 'stage-[in|out]' (string).
0913 :return: container command to be executed (string).
0914 """
0915
0916 command = 'cd %s;' % workdir
0917
0918
0919 middleware_container = get_middleware_container(label=label)
0920 content = get_middleware_container_script(middleware_container, cmd, label=label)
0921
0922 if label == 'stage-in':
0923 script_name = 'stagein.sh'
0924 elif label == 'stage-out':
0925 script_name = 'stageout.sh'
0926 else:
0927 script_name = 'general.sh'
0928
0929 try:
0930 status = write_file(os.path.join(workdir, script_name), content)
0931 except PilotException as exc:
0932 raise exc
0933 else:
0934 if status:
0935
0936 if proxy:
0937 x509 = os.environ.get('X509_USER_PROXY', '')
0938 if x509:
0939 command += 'export X509_USER_PROXY=%s;' % x509
0940 command += 'export ALRB_CONT_RUNPAYLOAD=\"source /srv/%s\";' % script_name
0941 if 'ALRB_CONT_UNPACKEDDIR' in os.environ:
0942 command += 'export ALRB_CONT_UNPACKEDDIR=%s;' % os.environ.get('ALRB_CONT_UNPACKEDDIR')
0943 command += get_asetup(alrb=True)
0944 command += 'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh -c %s' % middleware_container
0945 command += ' ' + get_container_options(container_options)
0946 command = command.replace(' ', ' ')
0947
0948 logger.debug('container command: %s', command)
0949
0950 return command
0951
0952
0953 def get_root_container_script(cmd):
0954 """
0955 Return the content of the root container script.
0956
0957 :param cmd: root command (string).
0958 :return: script content (string).
0959 """
0960
0961
0962 content = 'date\nlsetup \'root pilot\'\ndate\npython %s\nexit $?' % cmd
0963 logger.debug('root setup script content:\n\n%s\n\n', content)
0964
0965 return content
0966
0967
0968 def get_middleware_container_script(middleware_container, cmd, asetup=False, label=''):
0969 """
0970 Return the content of the middleware container script.
0971 If asetup is True, atlasLocalSetup will be added to the command.
0972
0973 :param middleware_container: container image (string).
0974 :param cmd: isolated stage-in/out command (string).
0975 :param asetup: optional True/False (boolean).
0976 :return: script content (string).
0977 """
0978
0979 sitename = 'export PILOT_RUCIO_SITENAME=%s; ' % os.environ.get('PILOT_RUCIO_SITENAME')
0980 if 'rucio' in middleware_container:
0981 content = sitename + 'python3 %s ' % cmd
0982 else:
0983 content = ''
0984 if is_python3():
0985 content += 'export ALRB_LOCAL_PY3=YES; '
0986 if asetup:
0987 content += get_asetup(asetup=False)
0988 if label == 'stagein' or label == 'stageout':
0989 content += sitename + 'lsetup rucio davix xrootd; '
0990 content += 'python3 %s ' % cmd if is_python3() else 'python %s' % cmd
0991 else:
0992 content += cmd
0993 if not asetup:
0994 content += '\nexit $?'
0995
0996 logger.debug('middleware container content:\n%s', content)
0997
0998 return content
0999
1000
1001 def get_middleware_container(label=None):
1002 """
1003 Return the middleware container.
1004
1005 :param label: label (string).
1006 :return: path (string).
1007 """
1008
1009 if label and label == 'general':
1010 return 'CentOS7'
1011
1012 if 'ALRB_CONT_UNPACKEDDIR' in os.environ:
1013 path = config.Container.middleware_container_no_path
1014 else:
1015 path = config.Container.middleware_container
1016 if path.startswith('/') and not os.path.exists(path):
1017 logger.warning('requested middleware container path does not exist: %s (switching to default value)', path)
1018 path = 'CentOS7'
1019 logger.info('using image: %s for middleware container', path)
1020
1021 return path