Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2020-2021
0009 
0010 from os import environ, path, getcwd  #, chmod
0011 
0012 from pilot.common.errorcodes import ErrorCodes
0013 from pilot.common.exception import PilotException, StageInFailure, StageOutFailure
0014 from pilot.util.config import config
0015 from pilot.util.container import execute
0016 from pilot.util.filehandling import copy, read_json, write_json, write_file, copy_pilot_source  #, find_executable
0017 
0018 import logging
0019 logger = logging.getLogger(__name__)
0020 errors = ErrorCodes()
0021 
0022 
0023 def containerise_general_command(job, container_options, label='command', container_type='container'):
0024     """
0025     Containerise a general command by execution in a script that can be run in a container.
0026 
0027     :param job: job object.
0028     :param label: label (string).
0029     :param container_options: container options from queuedata (string).
0030     :param container_type: optional 'container/bash'
0031     :raises PilotException: for general failures.
0032     :return:
0033     """
0034 
0035     cwd = getcwd()
0036 
0037     if container_type == 'container':
0038         # add bits and pieces needed to run the cmd in a container
0039         pilot_user = environ.get('PILOT_USER', 'generic').lower()
0040         user = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0041         try:
0042             cmd = user.create_middleware_container_command(job.workdir, job.debug_command, container_options, label=label, proxy=False)
0043         except PilotException as e:
0044             raise e
0045     else:
0046         logger.warning('not yet implemented')
0047         raise PilotException
0048 
0049     try:
0050         logger.info('*** executing %s (logging will be redirected) ***', label)
0051         exit_code, stdout, stderr = execute(cmd, job=job, usecontainer=False)
0052     except Exception as exc:
0053         logger.info('*** %s has failed ***', label)
0054         logger.warning('exception caught: %s', exc)
0055     else:
0056         if exit_code == 0:
0057             logger.info('*** %s has finished ***', label)
0058         else:
0059             logger.info('*** %s has failed ***', label)
0060         logger.debug('%s script returned exit_code=%d', label, exit_code)
0061 
0062 
0063 def containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite, container_options, external_dir,
0064                             label='stage-in', container_type='container'):
0065     """
0066     Containerise the middleware by performing stage-in/out steps in a script that in turn can be run in a container.
0067 
0068     Note: a container will only be used for option container_type='container'. If this is 'bash', then stage-in/out
0069     will still be done by a script, but not containerised.
0070 
0071     Note: this function is tailor made for stage-in/out.
0072 
0073     :param job: job object.
0074     :param xdata: list of FileSpec objects.
0075     :param queue: queue name (string).
0076     :param eventtype:
0077     :param localsite:
0078     :param remotesite:
0079     :param container_options: container options from queuedata (string).
0080     :param external_dir: input or output files directory (string).
0081     :param label: optional 'stage-in/out' (String).
0082     :param container_type: optional 'container/bash'
0083     :raises StageInFailure: for stage-in failures
0084     :raises StageOutFailure: for stage-out failures
0085     :return:
0086     """
0087 
0088     cwd = getcwd()
0089 
0090     # get the name of the stage-in/out isolation script
0091     script = config.Container.middleware_container_stagein_script if label == 'stage-in' else config.Container.middleware_container_stageout_script
0092 
0093     try:
0094         cmd = get_command(job, xdata, queue, script, eventtype, localsite, remotesite, external_dir, label=label, container_type=container_type)
0095     except PilotException as e:
0096         raise e
0097 
0098     if container_type == 'container':
0099         # add bits and pieces needed to run the cmd in a container
0100         pilot_user = environ.get('PILOT_USER', 'generic').lower()
0101         user = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0102         try:
0103             cmd = user.create_middleware_container_command(job.workdir, cmd, container_options, label=label)
0104         except PilotException as e:
0105             raise e
0106     else:
0107         logger.warning('%s will not be done in a container (but it will be done by a script)', label)
0108 
0109     try:
0110         logger.info('*** executing %s (logging will be redirected) ***', label)
0111         exit_code, stdout, stderr = execute(cmd, job=job, usecontainer=False)
0112     except Exception as exc:
0113         logger.info('*** %s has failed ***', label)
0114         logger.warning('exception caught: %s', exc)
0115     else:
0116         if exit_code == 0:
0117             logger.info('*** %s has finished ***', label)
0118         else:
0119             logger.info('*** %s has failed ***', label)
0120             logger.warning('stderr:\n%s', stderr)
0121             logger.warning('stdout:\n%s', stdout)
0122         logger.debug('%s script returned exit_code=%d', label, exit_code)
0123 
0124         # write stdout+stderr to files
0125         try:
0126             _stdout_name, _stderr_name = get_logfile_names(label)
0127             write_file(path.join(job.workdir, _stdout_name), stdout, mute=False)
0128             write_file(path.join(job.workdir, _stderr_name), stderr, mute=False)
0129         except PilotException as exc:
0130             msg = 'exception caught: %s' % exc
0131             if label == 'stage-in':
0132                 raise StageInFailure(msg)
0133             else:
0134                 raise StageOutFailure(msg)
0135 
0136     # handle errors, file statuses, etc (the stage-in/out scripts write errors and file status to a json file)
0137     try:
0138         handle_updated_job_object(job, xdata, label=label)
0139     except PilotException as exc:
0140         raise exc
0141 
0142 
0143 def get_script_path(script):
0144     """
0145     Return the path for the script.
0146 
0147     :param script: script name (string).
0148     :return: path (string).
0149     """
0150 
0151     srcdir = environ.get('PILOT_SOURCE_DIR', '.')
0152     _path = path.join(srcdir, 'pilot/scripts')
0153     if not path.exists(_path):
0154         _path = path.join(srcdir, 'pilot2')
0155         _path = path.join(_path, 'pilot/scripts')
0156     _path = path.join(_path, script)
0157     if not path.exists(_path):
0158         _path = ''
0159 
0160     return _path
0161 
0162 
0163 def get_command(job, xdata, queue, script, eventtype, localsite, remotesite, external_dir, label='stage-in', container_type='container'):
0164     """
0165     Get the middleware container execution command.
0166 
0167     Note: this function is tailor made for stage-in/out.
0168 
0169     :param job: job object.
0170     :param xdata: list of FileSpec objects.
0171     :param queue: queue name (string).
0172     :param script: name of stage-in/out script (string).
0173     :param eventtype:
0174     :param localsite:
0175     :param remotesite:
0176     :param external_dir: input or output files directory (string).
0177     :param label: optional 'stage-[in|out]' (string).
0178     :param container_type: optional 'container/bash' (string).
0179     :return: stage-in/out command (string).
0180     :raises PilotException: for stage-in/out related failures
0181     """
0182 
0183     if label == 'stage-out':
0184         filedata_dictionary = get_filedata_strings(xdata)
0185     else:
0186         filedata_dictionary = get_filedata(xdata)
0187 
0188         # write file data to file
0189         try:
0190             status = write_json(path.join(job.workdir, config.Container.stagein_replica_dictionary), filedata_dictionary)
0191         except Exception as exc:
0192             diagnostics = 'exception caught in get_command(): %s' % exc
0193             logger.warning(diagnostics)
0194             raise PilotException(diagnostics)
0195         else:
0196             if not status:
0197                 diagnostics = 'failed to write replica dictionary to file'
0198                 logger.warning(diagnostics)
0199                 raise PilotException(diagnostics)
0200 
0201     # copy pilot source into container directory, unless it is already there
0202     diagnostics = copy_pilot_source(job.workdir)
0203     if diagnostics:
0204         raise PilotException(diagnostics)
0205 
0206     final_script_path = path.join(job.workdir, script)
0207     environ['PYTHONPATH'] = environ.get('PYTHONPATH') + ':' + job.workdir
0208     script_path = path.join('pilot/scripts', script)
0209     full_script_path = path.join(path.join(job.workdir, script_path))
0210     copy(full_script_path, final_script_path)
0211 
0212     if container_type == 'container':
0213         # correct the path when containers have been used
0214         final_script_path = path.join('.', script)
0215         workdir = '/srv'
0216     else:
0217         # for container_type=bash we need to add the rucio setup
0218         pilot_user = environ.get('PILOT_USER', 'generic').lower()
0219         user = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0220         try:
0221             final_script_path = user.get_middleware_container_script('', final_script_path, asetup=True)
0222         except PilotException:
0223             final_script_path = 'python %s' % final_script_path
0224         workdir = job.workdir
0225 
0226     cmd = "%s -d -w %s -q %s --eventtype=%s --localsite=%s --remotesite=%s --produserid=\"%s\" --jobid=%s" % \
0227           (final_script_path, workdir, queue, eventtype, localsite, remotesite, job.produserid.replace(' ', '%20'), job.jobid)
0228 
0229     if label == 'stage-in':
0230         cmd += " --eventservicemerge=%s --usepcache=%s --usevp=%s --replicadictionary=%s" % \
0231                (job.is_eventservicemerge, job.infosys.queuedata.use_pcache, job.use_vp, config.Container.stagein_replica_dictionary)
0232         if external_dir:
0233             cmd += ' --inputdir=%s' % external_dir
0234     else:  # stage-out
0235         cmd += ' --lfns=%s --scopes=%s --datasets=%s --ddmendpoints=%s --guids=%s' % \
0236                (filedata_dictionary['lfns'], filedata_dictionary['scopes'], filedata_dictionary['datasets'],
0237                 filedata_dictionary['ddmendpoints'], filedata_dictionary['guids'])
0238         if external_dir:
0239             cmd += ' --outputdir=%s' % external_dir
0240 
0241     cmd += ' --taskid=%s' % job.taskid
0242     cmd += ' --jobdefinitionid=%s' % job.jobdefinitionid
0243     cmd += ' --catchall=%s' % job.infosys.queuedata.catchall
0244 
0245     if container_type == 'bash':
0246         cmd += '\nexit $?'
0247 
0248     return cmd
0249 
0250 
0251 def handle_updated_job_object(job, xdata, label='stage-in'):
0252     """
0253     Handle updated job object fields.
0254 
0255     :param job: job object.
0256     :param xdata: list of FileSpec objects.
0257     :param label: 'stage-in/out' (string).
0258     :return:
0259     :raises: StageInFailure, StageOutFailure
0260     """
0261 
0262     dictionary_name = config.Container.stagein_status_dictionary if label == 'stage-in' else config.Container.stageout_status_dictionary
0263 
0264     # read the JSON file created by the stage-in/out script
0265     if path.exists(path.join(job.workdir, dictionary_name + '.log')):
0266         dictionary_name += '.log'
0267     file_dictionary = read_json(path.join(job.workdir, dictionary_name))
0268 
0269     # update the job object accordingly
0270     if file_dictionary:
0271         # get file info and set essential parameters
0272         for fspec in xdata:
0273             try:
0274                 fspec.status = file_dictionary[fspec.lfn][0]
0275                 fspec.status_code = file_dictionary[fspec.lfn][1]
0276                 if label == 'stage-in':
0277                     fspec.turl = file_dictionary[fspec.lfn][2]
0278                     fspec.ddmendpoint = file_dictionary[fspec.lfn][3]
0279                 else:
0280                     fspec.surl = file_dictionary[fspec.lfn][2]
0281                     fspec.turl = file_dictionary[fspec.lfn][3]
0282                     fspec.checksum['adler32'] = file_dictionary[fspec.lfn][4]
0283                     fspec.filesize = file_dictionary[fspec.lfn][5]
0284             except Exception as exc:
0285                 msg = "exception caught while reading file dictionary: %s" % exc
0286                 logger.warning(msg)
0287                 if label == 'stage-in':
0288                     raise StageInFailure(msg)
0289                 else:
0290                     raise StageOutFailure(msg)
0291 
0292         # get main error info ('error': [error_diag, error_code])
0293         error_diag = file_dictionary['error'][0]
0294         error_code = file_dictionary['error'][1]
0295         if error_code:
0296             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error_code, msg=error_diag)
0297     else:
0298         msg = "%s file dictionary not found" % label
0299         logger.warning(msg)
0300         if label == 'stage-in':
0301             raise StageInFailure(msg)
0302         else:
0303             raise StageOutFailure(msg)
0304 
0305 
0306 def get_logfile_names(label):
0307     """
0308     Get the proper names for the redirected stage-in/out logs.
0309 
0310     :param label: 'stage-[in|out]' (string)
0311     :return: 'stage[in|out]_stdout' (string), 'stage[in|out]_stderr' (string).
0312     """
0313 
0314     if label == 'stage-in':
0315         _stdout_name = config.Container.middleware_stagein_stdout
0316         _stderr_name = config.Container.middleware_stagein_stderr
0317     else:
0318         _stdout_name = config.Container.middleware_stageout_stdout
0319         _stderr_name = config.Container.middleware_stageout_stderr
0320     if not _stdout_name:
0321         _stdout_name = 'stagein_stdout.txt' if label == 'stage-in' else 'stageout_stdout.txt'
0322     if not _stderr_name:
0323         _stderr_name = 'stagein_stderr.txt' if label == 'stage-in' else 'stageout_stderr.txt'
0324 
0325     return _stdout_name, _stderr_name
0326 
0327 
0328 def get_filedata(data):
0329     """
0330     Return a dictionary with LFNs, guids, scopes, datasets, ddmendpoints, etc.
0331     Note: this dictionary will be written to a file that will be read back by the stage-in script inside the container.
0332     Dictionary format:
0333         { lfn1: { 'guid': guid1, 'scope': scope1, 'dataset': dataset1, 'ddmendpoint': ddmendpoint1,
0334                   'filesize': filesize1, 'checksum': checksum1, 'allowlan': allowlan1, 'allowwan': allowwan1,
0335                   'directaccesslan': directaccesslan1, 'directaccesswan': directaccesswan1, 'istar': istar1,
0336                   'accessmode': accessmode1, 'storagetoken': storagetoken1}, lfn2: .. }
0337     :param data:
0338     :type data:
0339     :return:
0340     :rtype:
0341     """
0342 
0343     file_dictionary = {}
0344     for fspec in data:
0345         try:
0346             _type = 'md5' if ('md5' in fspec.checksum and 'adler32' not in fspec.checksum) else 'adler32'
0347             file_dictionary[fspec.lfn] = {'guid': fspec.guid,
0348                                           'scope': fspec.scope,
0349                                           'dataset': fspec.dataset,
0350                                           'ddmendpoint': fspec.ddmendpoint,
0351                                           'filesize': fspec.filesize,
0352                                           'checksum': fspec.checksum.get(_type, 'None'),
0353                                           'allowlan': fspec.allow_lan,
0354                                           'allowwan': fspec.allow_wan,
0355                                           'directaccesslan': fspec.direct_access_lan,
0356                                           'directaccesswan': fspec.direct_access_wan,
0357                                           'istar': fspec.is_tar,
0358                                           'accessmode': fspec.accessmode,
0359                                           'storagetoken': fspec.storage_token}
0360         except Exception as exc:
0361             logger.warning('exception caught in get_filedata(): %s', exc)
0362 
0363     return file_dictionary
0364 
0365 
0366 def get_filedata_strings(data):
0367     """
0368     Return a dictionary with comma-separated list of LFNs, guids, scopes, datasets, ddmendpoints, etc.
0369 
0370     :param data: job [in|out]data (list of FileSpec objects).
0371     :return: {'lfns': lfns, ..} (dictionary).
0372     """
0373 
0374     lfns = ""
0375     guids = ""
0376     scopes = ""
0377     datasets = ""
0378     ddmendpoints = ""
0379     filesizes = ""
0380     checksums = ""
0381     allowlans = ""
0382     allowwans = ""
0383     directaccesslans = ""
0384     directaccesswans = ""
0385     istars = ""
0386     accessmodes = ""
0387     storagetokens = ""
0388     for fspec in data:
0389         lfns = fspec.lfn if lfns == "" else lfns + ",%s" % fspec.lfn
0390         guids = fspec.guid if guids == "" else guids + ",%s" % fspec.guid
0391         scopes = fspec.scope if scopes == "" else scopes + ",%s" % fspec.scope
0392         datasets = fspec.dataset if datasets == "" else datasets + ",%s" % fspec.dataset
0393         ddmendpoints = fspec.ddmendpoint if ddmendpoints == "" else ddmendpoints + ",%s" % fspec.ddmendpoint
0394         filesizes = str(fspec.filesize) if filesizes == "" else filesizes + ",%s" % fspec.filesize
0395         _type = 'md5' if ('md5' in fspec.checksum and 'adler32' not in fspec.checksum) else 'adler32'
0396         checksums = fspec.checksum.get(_type, 'None') if checksums == "" else checksums + ",%s" % fspec.checksum.get(_type)
0397         allowlans = str(fspec.allow_lan) if allowlans == "" else allowlans + ",%s" % fspec.allow_lan
0398         allowwans = str(fspec.allow_wan) if allowwans == "" else allowwans + ",%s" % fspec.allow_wan
0399         directaccesslans = str(fspec.direct_access_lan) if directaccesslans == "" else directaccesslans + ",%s" % fspec.direct_access_lan
0400         directaccesswans = str(fspec.direct_access_wan) if directaccesswans == "" else directaccesswans + ",%s" % fspec.direct_access_wan
0401         istars = str(fspec.is_tar) if istars == "" else istars + ",%s" % fspec.is_tar
0402         _accessmode = fspec.accessmode if fspec.accessmode else 'None'
0403         accessmodes = _accessmode if accessmodes == "" else accessmodes + ",%s" % _accessmode
0404         _storagetoken = fspec.storage_token if fspec.storage_token else 'None'
0405         storagetokens = _storagetoken if storagetokens == "" else storagetokens + ",%s" % _storagetoken
0406 
0407     return {'lfns': lfns, 'guids': guids, 'scopes': scopes, 'datasets': datasets, 'ddmendpoints': ddmendpoints,
0408             'filesizes': filesizes, 'checksums': checksums, 'allowlans': allowlans, 'allowwans': allowwans,
0409             'directaccesslans': directaccesslans, 'directaccesswans': directaccesswans, 'istars': istars,
0410             'accessmodes': accessmodes, 'storagetokens': storagetokens}
0411 
0412 
0413 def use_middleware_script(container_type):
0414     """
0415     Should the pilot use a script for the stage-in/out?
0416     Check the container_type (from queuedata) if 'middleware' is set to 'container' or 'bash'.
0417 
0418     :param container_type: container type (string).
0419     :return: Boolean (True if middleware should be containerised).
0420     """
0421 
0422     return True if container_type == 'container' or container_type == 'bash' else False