Warning, file /pilot2/pilot/user/generic/common.py was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 from signal import SIGTERM
0012
0013 from pilot.common.exception import TrfDownloadFailure
0014 from pilot.util.config import config
0015 from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED
0016 from pilot.util.filehandling import read_file
0017 from .setup import get_analysis_trf
0018
0019 import logging
0020 logger = logging.getLogger(__name__)
0021
0022
0023 def sanity_check():
0024 """
0025 Perform an initial sanity check before doing anything else in a given workflow.
0026 This function can be used to verify importing of modules that are otherwise used much later, but it is better to abort
0027 the pilot if a problem is discovered early.
0028
0029 :return: exit code (0 if all is ok, otherwise non-zero exit code).
0030 """
0031
0032 return 0
0033
0034
0035 def validate(job):
0036 """
0037 Perform user specific payload/job validation.
0038
0039 :param job: job object.
0040 :return: Boolean (True if validation is successful).
0041 """
0042
0043 return True
0044
0045
0046 def get_payload_command(job):
0047 """
0048 Return the full command for executing the payload, including the sourcing of all setup files and setting of
0049 environment variables.
0050
0051 By default, the full payload command is assumed to be in the job.jobparams.
0052
0053 :param job: job object
0054 :return: command (string)
0055 """
0056
0057
0058
0059
0060
0061 ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0062 if ec != 0:
0063 raise TrfDownloadFailure(diagnostics)
0064 else:
0065 logger.debug('user analysis trf: %s' % trf_name)
0066
0067 return get_analysis_run_command(job, trf_name)
0068
0069
0070 def get_analysis_run_command(job, trf_name):
0071 """
0072 Return the proper run command for the user job.
0073
0074 Example output: export X509_USER_PROXY=<..>;./runAthena <job parameters> --usePFCTurl --directIn
0075
0076 :param job: job object.
0077 :param trf_name: name of the transform that will run the job (string). Used when containers are not used.
0078 :return: command (string).
0079 """
0080
0081 cmd = ""
0082
0083
0084 if 'X509_USER_PROXY' in os.environ and not job.imagename:
0085 cmd += 'export X509_USER_PROXY=%s;' % os.environ.get('X509_USER_PROXY')
0086
0087
0088 if job.imagename == "":
0089 cmd += './%s %s' % (trf_name, job.jobparams)
0090 else:
0091 if trf_name:
0092 cmd += './%s %s' % (trf_name, job.jobparams)
0093 else:
0094 cmd += 'python %s %s' % (trf_name, job.jobparams)
0095
0096 return cmd
0097
0098
0099 def update_job_data(job):
0100 """
0101 This function can be used to update/add data to the job object.
0102 E.g. user specific information can be extracted from other job object fields. In the case of ATLAS, information
0103 is extracted from the metaData field and added to other job object fields.
0104
0105 :param job: job object
0106 :return:
0107 """
0108
0109 pass
0110
0111
0112 def remove_redundant_files(workdir, outputfiles=[], islooping=False, debugmode=False):
0113 """
0114 Remove redundant files and directories prior to creating the log file.
0115
0116 :param workdir: working directory (string).
0117 :param outputfiles: list of output files.
0118 :param islooping: looping job variable to make sure workDir is not removed in case of looping (boolean).
0119 :return:
0120 """
0121
0122 pass
0123
0124
0125 def get_utility_commands(order=None, job=None):
0126 """
0127 Return a dictionary of utility commands and arguments to be executed in parallel with the payload.
0128 This could e.g. be memory and network monitor commands. A separate function can be used to determine the
0129 corresponding command setups using the utility command name.
0130 If the optional order parameter is set, the function should return the list of corresponding commands.
0131 E.g. if order=UTILITY_BEFORE_PAYLOAD, the function should return all commands that are to be executed before the
0132 payload. If order=UTILITY_WITH_PAYLOAD, the corresponding commands will be prepended to the payload execution
0133 string. If order=UTILITY_AFTER_PAYLOAD_STARTED, the commands that should be executed after the payload has been started
0134 should be returned.
0135
0136 FORMAT: {'command': <command>, 'args': <args>}
0137
0138 :param order: optional sorting order (see pilot.util.constants)
0139 :param job: optional job object.
0140 :return: dictionary of utilities to be executed in parallel with the payload.
0141 """
0142
0143 return {}
0144
0145
0146 def get_utility_command_setup(name, setup=None):
0147 """
0148 Return the proper setup for the given utility command.
0149 If a payload setup is specified
0150 :param name:
0151 :param setup:
0152 :return:
0153 """
0154
0155 pass
0156
0157
0158 def get_utility_command_execution_order(name):
0159 """
0160 Should the given utility command be executed before or after the payload?
0161
0162 :param name: utility name (string).
0163 :return: execution order constant (UTILITY_BEFORE_PAYLOAD or UTILITY_AFTER_PAYLOAD_STARTED)
0164 """
0165
0166
0167 if name == 'monitor':
0168 return UTILITY_BEFORE_PAYLOAD
0169 else:
0170 return UTILITY_AFTER_PAYLOAD_STARTED
0171
0172
0173 def post_utility_command_action(name, job):
0174 """
0175 Perform post action for given utility command.
0176
0177 :param name: name of utility command (string).
0178 :param job: job object.
0179 :return:
0180 """
0181
0182 pass
0183
0184
0185 def get_utility_command_kill_signal(name):
0186 """
0187 Return the proper kill signal used to stop the utility command.
0188
0189 :param name:
0190 :return: kill signal
0191 """
0192
0193 return SIGTERM
0194
0195
0196 def get_utility_command_output_filename(name, selector=None):
0197 """
0198 Return the filename to the output of the utility command.
0199
0200 :param name: utility name (string).
0201 :param selector: optional special conditions flag (boolean).
0202 :return: filename (string).
0203 """
0204
0205 return ""
0206
0207
0208 def verify_job(job):
0209 """
0210 Verify job parameters for specific errors.
0211 Note:
0212 in case of problem, the function should set the corresponding pilot error code using
0213 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
0214
0215 :param job: job object
0216 :return: Boolean.
0217 """
0218
0219 return True
0220
0221
0222 def update_stagein(job):
0223 """
0224 In case special files need to be skipped during stage-in, the job.indata list can be updated here.
0225 See ATLAS code for an example.
0226
0227 :param job: job object.
0228 :return:
0229 """
0230
0231 pass
0232
0233
0234 def get_metadata(workdir):
0235 """
0236 Return the metadata from file.
0237
0238 :param workdir: work directory (string)
0239 :return:
0240 """
0241
0242 path = os.path.join(workdir, config.Payload.jobreport)
0243 metadata = read_file(path) if os.path.exists(path) else None
0244
0245 return metadata
0246
0247
0248 def update_server(job):
0249 """
0250 Perform any user specific server actions.
0251
0252 E.g. this can be used to send special information to a logstash.
0253
0254 :param job: job object.
0255 :return:
0256 """
0257
0258 pass
0259
0260
0261 def post_prestagein_utility_command(**kwargs):
0262 """
0263 Execute any post pre-stage-in utility commands.
0264
0265 :param kwargs: kwargs (dictionary).
0266 :return:
0267 """
0268
0269
0270
0271
0272 pass
0273
0274
0275 def process_debug_command(debug_command, pandaid):
0276 """
0277 In debug mode, the server can send a special debug command to the pilot via the updateJob backchannel.
0278 This function can be used to process that command, i.e. to identify a proper pid to debug (which is unknown
0279 to the server).
0280
0281 :param debug_command: debug command (string), payload pid (int).
0282 :param pandaid: PanDA id (string).
0283 :return: updated debug command (string)
0284 """
0285
0286 return debug_command
0287
0288
0289 def allow_timefloor(submitmode):
0290 """
0291 Should the timefloor mechanism (multi-jobs) be allowed for the given submit mode?
0292
0293 :param submitmode: submit mode (string).
0294 """
0295
0296 return True