File indexing completed on 2026-04-11 08:41:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 from signal import SIGTERM
0012
0013 from pilot.common.exception import TrfDownloadFailure
0014 from pilot.util.config import config
0015 from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED
0016 from pilot.util.filehandling import read_file
0017 from .setup import get_analysis_trf
0018
0019 import logging
0020 logger = logging.getLogger(__name__)
0021
0022
0023 def sanity_check():
0024 """
0025 Perform an initial sanity check before doing anything else in a given workflow.
0026 This function can be used to verify importing of modules that are otherwise used much later, but it is better to abort
0027 the pilot if a problem is discovered early.
0028
0029 :return: exit code (0 if all is ok, otherwise non-zero exit code).
0030 """
0031
0032 return 0
0033
0034
0035 def validate(job):
0036 """
0037 Perform user specific payload/job validation.
0038
0039 :param job: job object.
0040 :return: Boolean (True if validation is successful).
0041 """
0042
0043 return True
0044
0045
0046 def get_payload_command(job):
0047 """
0048 Return the full command for executing the payload, including the sourcing of all setup files and setting of
0049 environment variables.
0050
0051 By default, the full payload command is assumed to be in the job.jobparams.
0052
0053 :param job: job object
0054 :return: command (string)
0055 """
0056
0057
0058
0059
0060
0061 ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0062 if ec != 0:
0063 raise TrfDownloadFailure(diagnostics)
0064 else:
0065 logger.debug('user analysis trf: %s' % trf_name)
0066
0067 return get_analysis_run_command(job, trf_name)
0068
0069
0070 def get_analysis_run_command(job, trf_name):
0071 """
0072 Return the proper run command for the user job.
0073
0074 Example output: export X509_USER_PROXY=<..>;./runAthena <job parameters> --usePFCTurl --directIn
0075
0076 :param job: job object.
0077 :param trf_name: name of the transform that will run the job (string). Used when containers are not used.
0078 :return: command (string).
0079 """
0080
0081 cmd = ""
0082
0083
0084 if 'X509_USER_PROXY' in os.environ and not job.imagename:
0085 cmd += 'export X509_USER_PROXY=%s;' % os.environ.get('X509_USER_PROXY')
0086
0087
0088 if job.imagename == "":
0089 cmd += './%s %s' % (trf_name, job.jobparams)
0090 else:
0091 if trf_name:
0092 cmd += './%s %s' % (trf_name, job.jobparams)
0093 else:
0094 cmd += 'python %s %s' % (trf_name, job.jobparams)
0095
0096 return cmd
0097
0098
0099 def update_job_data(job):
0100 """
0101 This function can be used to update/add data to the job object.
0102 E.g. user specific information can be extracted from other job object fields. In the case of ATLAS, information
0103 is extracted from the metaData field and added to other job object fields.
0104
0105 :param job: job object
0106 :return:
0107 """
0108
0109 pass
0110
0111
0112 def remove_redundant_files(workdir, outputfiles=[], islooping=False, debugmode=False):
0113 """
0114 Remove redundant files and directories prior to creating the log file.
0115
0116 :param workdir: working directory (string).
0117 :param outputfiles: list of output files.
0118 :param islooping: looping job variable to make sure workDir is not removed in case of looping (boolean).
0119 :param debugmode: True if debug mode has been switched on (Boolean).
0120 :return:
0121 """
0122
0123 pass
0124
0125
0126 def get_utility_commands(order=None, job=None):
0127 """
0128 Return a dictionary of utility commands and arguments to be executed in parallel with the payload.
0129 This could e.g. be memory and network monitor commands. A separate function can be used to determine the
0130 corresponding command setups using the utility command name.
0131 If the optional order parameter is set, the function should return the list of corresponding commands.
0132 E.g. if order=UTILITY_BEFORE_PAYLOAD, the function should return all commands that are to be executed before the
0133 payload. If order=UTILITY_WITH_PAYLOAD, the corresponding commands will be prepended to the payload execution
0134 string. If order=UTILITY_AFTER_PAYLOAD_STARTED, the commands that should be executed after the payload has been started
0135 should be returned.
0136
0137 FORMAT: {'command': <command>, 'args': <args>}
0138
0139 :param order: optional sorting order (see pilot.util.constants)
0140 :param job: optional job object.
0141 :return: dictionary of utilities to be executed in parallel with the payload.
0142 """
0143
0144 return {}
0145
0146
0147 def get_utility_command_setup(name, setup=None):
0148 """
0149 Return the proper setup for the given utility command.
0150 If a payload setup is specified
0151 :param name:
0152 :param setup:
0153 :return:
0154 """
0155
0156 pass
0157
0158
0159 def get_utility_command_execution_order(name):
0160 """
0161 Should the given utility command be executed before or after the payload?
0162
0163 :param name: utility name (string).
0164 :return: execution order constant (UTILITY_BEFORE_PAYLOAD or UTILITY_AFTER_PAYLOAD_STARTED)
0165 """
0166
0167
0168 if name == 'monitor':
0169 return UTILITY_BEFORE_PAYLOAD
0170 else:
0171 return UTILITY_AFTER_PAYLOAD_STARTED
0172
0173
0174 def post_utility_command_action(name, job):
0175 """
0176 Perform post action for given utility command.
0177
0178 :param name: name of utility command (string).
0179 :param job: job object.
0180 :return:
0181 """
0182
0183 pass
0184
0185
0186 def get_utility_command_kill_signal(name):
0187 """
0188 Return the proper kill signal used to stop the utility command.
0189
0190 :param name:
0191 :return: kill signal
0192 """
0193
0194 return SIGTERM
0195
0196
0197 def get_utility_command_output_filename(name, selector=None):
0198 """
0199 Return the filename to the output of the utility command.
0200
0201 :param name: utility name (string).
0202 :param selector: optional special conditions flag (boolean).
0203 :return: filename (string).
0204 """
0205
0206 return ""
0207
0208
0209 def verify_job(job):
0210 """
0211 Verify job parameters for specific errors.
0212 Note:
0213 in case of problem, the function should set the corresponding pilot error code using
0214 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
0215
0216 :param job: job object
0217 :return: Boolean.
0218 """
0219
0220 return True
0221
0222
0223 def update_stagein(job):
0224 """
0225 In case special files need to be skipped during stage-in, the job.indata list can be updated here.
0226 See ATLAS code for an example.
0227
0228 :param job: job object.
0229 :return:
0230 """
0231
0232 pass
0233
0234
0235 def get_metadata(workdir):
0236 """
0237 Return the metadata from file.
0238
0239 :param workdir: work directory (string)
0240 :return:
0241 """
0242
0243 path = os.path.join(workdir, config.Payload.jobreport)
0244 metadata = read_file(path) if os.path.exists(path) else None
0245
0246 return metadata
0247
0248
0249 def update_server(job):
0250 """
0251 Perform any user specific server actions.
0252
0253 E.g. this can be used to send special information to a logstash.
0254
0255 :param job: job object.
0256 :return:
0257 """
0258
0259 pass
0260
0261
0262 def post_prestagein_utility_command(**kwargs):
0263 """
0264 Execute any post pre-stage-in utility commands.
0265
0266 :param kwargs: kwargs (dictionary).
0267 :return:
0268 """
0269
0270
0271
0272
0273 pass
0274
0275
0276 def process_debug_command(debug_command, pandaid):
0277 """
0278 In debug mode, the server can send a special debug command to the pilot via the updateJob backchannel.
0279 This function can be used to process that command, i.e. to identify a proper pid to debug (which is unknown
0280 to the server).
0281
0282 :param debug_command: debug command (string), payload pid (int).
0283 :param pandaid: PanDA id (string).
0284 :return: updated debug command (string)
0285 """
0286
0287 return debug_command
0288
0289
0290 def allow_timefloor(submitmode):
0291 """
0292 Should the timefloor mechanism (multi-jobs) be allowed for the given submit mode?
0293
0294 :param submitmode: submit mode (string).
0295 """
0296
0297 allow = True
0298 if submitmode.lower() == 'push':
0299 logger.info('Since the submitmode=push, override timefloor with zero manually')
0300 allow = False
0301
0302 return allow