File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 from os import environ, path, getcwd
0011
0012 from pilot.common.errorcodes import ErrorCodes
0013 from pilot.common.exception import PilotException, StageInFailure, StageOutFailure
0014 from pilot.util.config import config
0015 from pilot.util.container import execute
0016 from pilot.util.filehandling import copy, read_json, write_json, write_file, copy_pilot_source
0017
0018 import logging
0019 logger = logging.getLogger(__name__)
0020 errors = ErrorCodes()
0021
0022
0023 def containerise_general_command(job, container_options, label='command', container_type='container'):
0024 """
0025 Containerise a general command by execution in a script that can be run in a container.
0026
0027 :param job: job object.
0028 :param label: label (string).
0029 :param container_options: container options from queuedata (string).
0030 :param container_type: optional 'container/bash'
0031 :raises PilotException: for general failures.
0032 :return:
0033 """
0034
0035 cwd = getcwd()
0036
0037 if container_type == 'container':
0038
0039 pilot_user = environ.get('PILOT_USER', 'generic').lower()
0040 user = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [pilot_user], 0)
0041 try:
0042 cmd = user.create_middleware_container_command(job.workdir, job.debug_command, container_options, label=label, proxy=False)
0043 except PilotException as e:
0044 raise e
0045 else:
0046 logger.warning('not yet implemented')
0047 raise PilotException
0048
0049 try:
0050 logger.info('*** executing %s (logging will be redirected) ***', label)
0051 exit_code, stdout, stderr = execute(cmd, job=job, usecontainer=False)
0052 except Exception as exc:
0053 logger.info('*** %s has failed ***', label)
0054 logger.warning('exception caught: %s', exc)
0055 else:
0056 if exit_code == 0:
0057 logger.info('*** %s has finished ***', label)
0058 else:
0059 logger.info('*** %s has failed ***', label)
0060 logger.debug('%s script returned exit_code=%d', label, exit_code)
0061
0062
0063 def containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite, container_options, external_dir,
0064 label='stage-in', container_type='container'):
0065 """
0066 Containerise the middleware by performing stage-in/out steps in a script that in turn can be run in a container.
0067
0068 Note: a container will only be used for option container_type='container'. If this is 'bash', then stage-in/out
0069 will still be done by a script, but not containerised.
0070
0071 Note: this function is tailor made for stage-in/out.
0072
0073 :param job: job object.
0074 :param xdata: list of FileSpec objects.
0075 :param queue: queue name (string).
0076 :param eventtype:
0077 :param localsite:
0078 :param remotesite:
0079 :param container_options: container options from queuedata (string).
0080 :param external_dir: input or output files directory (string).
0081 :param label: optional 'stage-in/out' (String).
0082 :param container_type: optional 'container/bash'
0083 :raises StageInFailure: for stage-in failures
0084 :raises StageOutFailure: for stage-out failures
0085 :return:
0086 """
0087
0088 cwd = getcwd()
0089
0090
0091 script = config.Container.middleware_container_stagein_script if label == 'stage-in' else config.Container.middleware_container_stageout_script
0092
0093 try:
0094 cmd = get_command(job, xdata, queue, script, eventtype, localsite, remotesite, external_dir, label=label, container_type=container_type)
0095 except PilotException as e:
0096 raise e
0097
0098 if container_type == 'container':
0099
0100 pilot_user = environ.get('PILOT_USER', 'generic').lower()
0101 user = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [pilot_user], 0)
0102 try:
0103 cmd = user.create_middleware_container_command(job.workdir, cmd, container_options, label=label)
0104 except PilotException as e:
0105 raise e
0106 else:
0107 logger.warning('%s will not be done in a container (but it will be done by a script)', label)
0108
0109 try:
0110 logger.info('*** executing %s (logging will be redirected) ***', label)
0111 exit_code, stdout, stderr = execute(cmd, job=job, usecontainer=False)
0112 except Exception as exc:
0113 logger.info('*** %s has failed ***', label)
0114 logger.warning('exception caught: %s', exc)
0115 else:
0116 if exit_code == 0:
0117 logger.info('*** %s has finished ***', label)
0118 else:
0119 logger.info('*** %s has failed ***', label)
0120 logger.warning('stderr:\n%s', stderr)
0121 logger.warning('stdout:\n%s', stdout)
0122 logger.debug('%s script returned exit_code=%d', label, exit_code)
0123
0124
0125 try:
0126 _stdout_name, _stderr_name = get_logfile_names(label)
0127 write_file(path.join(job.workdir, _stdout_name), stdout, mute=False)
0128 write_file(path.join(job.workdir, _stderr_name), stderr, mute=False)
0129 except PilotException as exc:
0130 msg = 'exception caught: %s' % exc
0131 if label == 'stage-in':
0132 raise StageInFailure(msg)
0133 else:
0134 raise StageOutFailure(msg)
0135
0136
0137 try:
0138 handle_updated_job_object(job, xdata, label=label)
0139 except PilotException as exc:
0140 raise exc
0141
0142
0143 def get_script_path(script):
0144 """
0145 Return the path for the script.
0146
0147 :param script: script name (string).
0148 :return: path (string).
0149 """
0150
0151 srcdir = environ.get('PILOT_SOURCE_DIR', '.')
0152 _path = path.join(srcdir, 'pilot/scripts')
0153 if not path.exists(_path):
0154 _path = path.join(srcdir, 'pilot2')
0155 _path = path.join(_path, 'pilot/scripts')
0156 _path = path.join(_path, script)
0157 if not path.exists(_path):
0158 _path = ''
0159
0160 return _path
0161
0162
0163 def get_command(job, xdata, queue, script, eventtype, localsite, remotesite, external_dir, label='stage-in', container_type='container'):
0164 """
0165 Get the middleware container execution command.
0166
0167 Note: this function is tailor made for stage-in/out.
0168
0169 :param job: job object.
0170 :param xdata: list of FileSpec objects.
0171 :param queue: queue name (string).
0172 :param script: name of stage-in/out script (string).
0173 :param eventtype:
0174 :param localsite:
0175 :param remotesite:
0176 :param external_dir: input or output files directory (string).
0177 :param label: optional 'stage-[in|out]' (string).
0178 :param container_type: optional 'container/bash' (string).
0179 :return: stage-in/out command (string).
0180 :raises PilotException: for stage-in/out related failures
0181 """
0182
0183 if label == 'stage-out':
0184 filedata_dictionary = get_filedata_strings(xdata)
0185 else:
0186 filedata_dictionary = get_filedata(xdata)
0187
0188
0189 try:
0190 status = write_json(path.join(job.workdir, config.Container.stagein_replica_dictionary), filedata_dictionary)
0191 except Exception as exc:
0192 diagnostics = 'exception caught in get_command(): %s' % exc
0193 logger.warning(diagnostics)
0194 raise PilotException(diagnostics)
0195 else:
0196 if not status:
0197 diagnostics = 'failed to write replica dictionary to file'
0198 logger.warning(diagnostics)
0199 raise PilotException(diagnostics)
0200
0201
0202 diagnostics = copy_pilot_source(job.workdir)
0203 if diagnostics:
0204 raise PilotException(diagnostics)
0205
0206 final_script_path = path.join(job.workdir, script)
0207 environ['PYTHONPATH'] = environ.get('PYTHONPATH') + ':' + job.workdir
0208 script_path = path.join('pilot/scripts', script)
0209 full_script_path = path.join(path.join(job.workdir, script_path))
0210 copy(full_script_path, final_script_path)
0211
0212 if container_type == 'container':
0213
0214 final_script_path = path.join('.', script)
0215 workdir = '/srv'
0216 else:
0217
0218 pilot_user = environ.get('PILOT_USER', 'generic').lower()
0219 user = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [pilot_user], 0)
0220 try:
0221 final_script_path = user.get_middleware_container_script('', final_script_path, asetup=True)
0222 except PilotException:
0223 final_script_path = 'python %s' % final_script_path
0224 workdir = job.workdir
0225
0226 cmd = "%s -d -w %s -q %s --eventtype=%s --localsite=%s --remotesite=%s --produserid=\"%s\" --jobid=%s" % \
0227 (final_script_path, workdir, queue, eventtype, localsite, remotesite, job.produserid.replace(' ', '%20'), job.jobid)
0228
0229 if label == 'stage-in':
0230 cmd += " --eventservicemerge=%s --usepcache=%s --usevp=%s --replicadictionary=%s" % \
0231 (job.is_eventservicemerge, job.infosys.queuedata.use_pcache, job.use_vp, config.Container.stagein_replica_dictionary)
0232 if external_dir:
0233 cmd += ' --inputdir=%s' % external_dir
0234 else:
0235 cmd += ' --lfns=%s --scopes=%s --datasets=%s --ddmendpoints=%s --guids=%s' % \
0236 (filedata_dictionary['lfns'], filedata_dictionary['scopes'], filedata_dictionary['datasets'],
0237 filedata_dictionary['ddmendpoints'], filedata_dictionary['guids'])
0238 if external_dir:
0239 cmd += ' --outputdir=%s' % external_dir
0240
0241 cmd += ' --taskid=%s' % job.taskid
0242 cmd += ' --jobdefinitionid=%s' % job.jobdefinitionid
0243 cmd += ' --catchall=%s' % job.infosys.queuedata.catchall
0244
0245 if container_type == 'bash':
0246 cmd += '\nexit $?'
0247
0248 return cmd
0249
0250
0251 def handle_updated_job_object(job, xdata, label='stage-in'):
0252 """
0253 Handle updated job object fields.
0254
0255 :param job: job object.
0256 :param xdata: list of FileSpec objects.
0257 :param label: 'stage-in/out' (string).
0258 :return:
0259 :raises: StageInFailure, StageOutFailure
0260 """
0261
0262 dictionary_name = config.Container.stagein_status_dictionary if label == 'stage-in' else config.Container.stageout_status_dictionary
0263
0264
0265 if path.exists(path.join(job.workdir, dictionary_name + '.log')):
0266 dictionary_name += '.log'
0267 file_dictionary = read_json(path.join(job.workdir, dictionary_name))
0268
0269
0270 if file_dictionary:
0271
0272 for fspec in xdata:
0273 try:
0274 fspec.status = file_dictionary[fspec.lfn][0]
0275 fspec.status_code = file_dictionary[fspec.lfn][1]
0276 if label == 'stage-in':
0277 fspec.turl = file_dictionary[fspec.lfn][2]
0278 fspec.ddmendpoint = file_dictionary[fspec.lfn][3]
0279 else:
0280 fspec.surl = file_dictionary[fspec.lfn][2]
0281 fspec.turl = file_dictionary[fspec.lfn][3]
0282 fspec.checksum['adler32'] = file_dictionary[fspec.lfn][4]
0283 fspec.filesize = file_dictionary[fspec.lfn][5]
0284 except Exception as exc:
0285 msg = "exception caught while reading file dictionary: %s" % exc
0286 logger.warning(msg)
0287 if label == 'stage-in':
0288 raise StageInFailure(msg)
0289 else:
0290 raise StageOutFailure(msg)
0291
0292
0293 error_diag = file_dictionary['error'][0]
0294 error_code = file_dictionary['error'][1]
0295 if error_code:
0296 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error_code, msg=error_diag)
0297 else:
0298 msg = "%s file dictionary not found" % label
0299 logger.warning(msg)
0300 if label == 'stage-in':
0301 raise StageInFailure(msg)
0302 else:
0303 raise StageOutFailure(msg)
0304
0305
0306 def get_logfile_names(label):
0307 """
0308 Get the proper names for the redirected stage-in/out logs.
0309
0310 :param label: 'stage-[in|out]' (string)
0311 :return: 'stage[in|out]_stdout' (string), 'stage[in|out]_stderr' (string).
0312 """
0313
0314 if label == 'stage-in':
0315 _stdout_name = config.Container.middleware_stagein_stdout
0316 _stderr_name = config.Container.middleware_stagein_stderr
0317 else:
0318 _stdout_name = config.Container.middleware_stageout_stdout
0319 _stderr_name = config.Container.middleware_stageout_stderr
0320 if not _stdout_name:
0321 _stdout_name = 'stagein_stdout.txt' if label == 'stage-in' else 'stageout_stdout.txt'
0322 if not _stderr_name:
0323 _stderr_name = 'stagein_stderr.txt' if label == 'stage-in' else 'stageout_stderr.txt'
0324
0325 return _stdout_name, _stderr_name
0326
0327
0328 def get_filedata(data):
0329 """
0330 Return a dictionary with LFNs, guids, scopes, datasets, ddmendpoints, etc.
0331 Note: this dictionary will be written to a file that will be read back by the stage-in script inside the container.
0332 Dictionary format:
0333 { lfn1: { 'guid': guid1, 'scope': scope1, 'dataset': dataset1, 'ddmendpoint': ddmendpoint1,
0334 'filesize': filesize1, 'checksum': checksum1, 'allowlan': allowlan1, 'allowwan': allowwan1,
0335 'directaccesslan': directaccesslan1, 'directaccesswan': directaccesswan1, 'istar': istar1,
0336 'accessmode': accessmode1, 'storagetoken': storagetoken1}, lfn2: .. }
0337 :param data:
0338 :type data:
0339 :return:
0340 :rtype:
0341 """
0342
0343 file_dictionary = {}
0344 for fspec in data:
0345 try:
0346 _type = 'md5' if ('md5' in fspec.checksum and 'adler32' not in fspec.checksum) else 'adler32'
0347 file_dictionary[fspec.lfn] = {'guid': fspec.guid,
0348 'scope': fspec.scope,
0349 'dataset': fspec.dataset,
0350 'ddmendpoint': fspec.ddmendpoint,
0351 'filesize': fspec.filesize,
0352 'checksum': fspec.checksum.get(_type, 'None'),
0353 'allowlan': fspec.allow_lan,
0354 'allowwan': fspec.allow_wan,
0355 'directaccesslan': fspec.direct_access_lan,
0356 'directaccesswan': fspec.direct_access_wan,
0357 'istar': fspec.is_tar,
0358 'accessmode': fspec.accessmode,
0359 'storagetoken': fspec.storage_token}
0360 except Exception as exc:
0361 logger.warning('exception caught in get_filedata(): %s', exc)
0362
0363 return file_dictionary
0364
0365
0366 def get_filedata_strings(data):
0367 """
0368 Return a dictionary with comma-separated list of LFNs, guids, scopes, datasets, ddmendpoints, etc.
0369
0370 :param data: job [in|out]data (list of FileSpec objects).
0371 :return: {'lfns': lfns, ..} (dictionary).
0372 """
0373
0374 lfns = ""
0375 guids = ""
0376 scopes = ""
0377 datasets = ""
0378 ddmendpoints = ""
0379 filesizes = ""
0380 checksums = ""
0381 allowlans = ""
0382 allowwans = ""
0383 directaccesslans = ""
0384 directaccesswans = ""
0385 istars = ""
0386 accessmodes = ""
0387 storagetokens = ""
0388 for fspec in data:
0389 lfns = fspec.lfn if lfns == "" else lfns + ",%s" % fspec.lfn
0390 guids = fspec.guid if guids == "" else guids + ",%s" % fspec.guid
0391 scopes = fspec.scope if scopes == "" else scopes + ",%s" % fspec.scope
0392 datasets = fspec.dataset if datasets == "" else datasets + ",%s" % fspec.dataset
0393 ddmendpoints = fspec.ddmendpoint if ddmendpoints == "" else ddmendpoints + ",%s" % fspec.ddmendpoint
0394 filesizes = str(fspec.filesize) if filesizes == "" else filesizes + ",%s" % fspec.filesize
0395 _type = 'md5' if ('md5' in fspec.checksum and 'adler32' not in fspec.checksum) else 'adler32'
0396 checksums = fspec.checksum.get(_type, 'None') if checksums == "" else checksums + ",%s" % fspec.checksum.get(_type)
0397 allowlans = str(fspec.allow_lan) if allowlans == "" else allowlans + ",%s" % fspec.allow_lan
0398 allowwans = str(fspec.allow_wan) if allowwans == "" else allowwans + ",%s" % fspec.allow_wan
0399 directaccesslans = str(fspec.direct_access_lan) if directaccesslans == "" else directaccesslans + ",%s" % fspec.direct_access_lan
0400 directaccesswans = str(fspec.direct_access_wan) if directaccesswans == "" else directaccesswans + ",%s" % fspec.direct_access_wan
0401 istars = str(fspec.is_tar) if istars == "" else istars + ",%s" % fspec.is_tar
0402 _accessmode = fspec.accessmode if fspec.accessmode else 'None'
0403 accessmodes = _accessmode if accessmodes == "" else accessmodes + ",%s" % _accessmode
0404 _storagetoken = fspec.storage_token if fspec.storage_token else 'None'
0405 storagetokens = _storagetoken if storagetokens == "" else storagetokens + ",%s" % _storagetoken
0406
0407 return {'lfns': lfns, 'guids': guids, 'scopes': scopes, 'datasets': datasets, 'ddmendpoints': ddmendpoints,
0408 'filesizes': filesizes, 'checksums': checksums, 'allowlans': allowlans, 'allowwans': allowwans,
0409 'directaccesslans': directaccesslans, 'directaccesswans': directaccesswans, 'istars': istars,
0410 'accessmodes': accessmodes, 'storagetokens': storagetokens}
0411
0412
0413 def use_middleware_script(container_type):
0414 """
0415 Should the pilot use a script for the stage-in/out?
0416 Check the container_type (from queuedata) if 'middleware' is set to 'container' or 'bash'.
0417
0418 :param container_type: container type (string).
0419 :return: Boolean (True if middleware should be containerised).
0420 """
0421
0422 return True if container_type == 'container' or container_type == 'bash' else False