Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /pilot2/pilot/user/generic/common.py was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2020
0009 
0010 import os
0011 from signal import SIGTERM
0012 
0013 from pilot.common.exception import TrfDownloadFailure
0014 from pilot.util.config import config
0015 from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED
0016 from pilot.util.filehandling import read_file
0017 from .setup import get_analysis_trf
0018 
0019 import logging
0020 logger = logging.getLogger(__name__)
0021 
0022 
0023 def sanity_check():
0024     """
0025     Perform an initial sanity check before doing anything else in a given workflow.
0026     This function can be used to verify importing of modules that are otherwise used much later, but it is better to abort
0027     the pilot if a problem is discovered early.
0028 
0029     :return: exit code (0 if all is ok, otherwise non-zero exit code).
0030     """
0031 
0032     return 0
0033 
0034 
0035 def validate(job):
0036     """
0037     Perform user specific payload/job validation.
0038 
0039     :param job: job object.
0040     :return: Boolean (True if validation is successful).
0041     """
0042 
0043     return True
0044 
0045 
0046 def get_payload_command(job):
0047     """
0048     Return the full command for executing the payload, including the sourcing of all setup files and setting of
0049     environment variables.
0050 
0051     By default, the full payload command is assumed to be in the job.jobparams.
0052 
0053     :param job: job object
0054     :return: command (string)
0055     """
0056 
0057     # Try to download the trf
0058     # if job.imagename != "" or "--containerImage" in job.jobparams:
0059     #    job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer")
0060     #    logger.warning('overwrote job.transformation, now set to: %s' % job.transformation)
0061     ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0062     if ec != 0:
0063         raise TrfDownloadFailure(diagnostics)
0064     else:
0065         logger.debug('user analysis trf: %s' % trf_name)
0066 
0067     return get_analysis_run_command(job, trf_name)
0068 
0069 
0070 def get_analysis_run_command(job, trf_name):
0071     """
0072     Return the proper run command for the user job.
0073 
0074     Example output: export X509_USER_PROXY=<..>;./runAthena <job parameters> --usePFCTurl --directIn
0075 
0076     :param job: job object.
0077     :param trf_name: name of the transform that will run the job (string). Used when containers are not used.
0078     :return: command (string).
0079     """
0080 
0081     cmd = ""
0082 
0083     # add the user proxy
0084     if 'X509_USER_PROXY' in os.environ and not job.imagename:
0085         cmd += 'export X509_USER_PROXY=%s;' % os.environ.get('X509_USER_PROXY')
0086 
0087     # set up trfs
0088     if job.imagename == "":  # user jobs with no imagename defined
0089         cmd += './%s %s' % (trf_name, job.jobparams)
0090     else:
0091         if trf_name:
0092             cmd += './%s %s' % (trf_name, job.jobparams)
0093         else:
0094             cmd += 'python %s %s' % (trf_name, job.jobparams)
0095 
0096     return cmd
0097 
0098 
0099 def update_job_data(job):
0100     """
0101     This function can be used to update/add data to the job object.
0102     E.g. user specific information can be extracted from other job object fields. In the case of ATLAS, information
0103     is extracted from the metaData field and added to other job object fields.
0104 
0105     :param job: job object
0106     :return:
0107     """
0108 
0109     pass
0110 
0111 
0112 def remove_redundant_files(workdir, outputfiles=[], islooping=False, debugmode=False):
0113     """
0114     Remove redundant files and directories prior to creating the log file.
0115 
0116     :param workdir: working directory (string).
0117     :param outputfiles: list of output files.
0118     :param islooping: looping job variable to make sure workDir is not removed in case of looping (boolean).
0119     :return:
0120     """
0121 
0122     pass
0123 
0124 
0125 def get_utility_commands(order=None, job=None):
0126     """
0127     Return a dictionary of utility commands and arguments to be executed in parallel with the payload.
0128     This could e.g. be memory and network monitor commands. A separate function can be used to determine the
0129     corresponding command setups using the utility command name.
0130     If the optional order parameter is set, the function should return the list of corresponding commands.
0131     E.g. if order=UTILITY_BEFORE_PAYLOAD, the function should return all commands that are to be executed before the
0132     payload. If order=UTILITY_WITH_PAYLOAD, the corresponding commands will be prepended to the payload execution
0133     string. If order=UTILITY_AFTER_PAYLOAD_STARTED, the commands that should be executed after the payload has been started
0134     should be returned.
0135 
0136     FORMAT: {'command': <command>, 'args': <args>}
0137 
0138     :param order: optional sorting order (see pilot.util.constants)
0139     :param job: optional job object.
0140     :return: dictionary of utilities to be executed in parallel with the payload.
0141     """
0142 
0143     return {}
0144 
0145 
0146 def get_utility_command_setup(name, setup=None):
0147     """
0148     Return the proper setup for the given utility command.
0149     If a payload setup is specified
0150     :param name:
0151     :param setup:
0152     :return:
0153     """
0154 
0155     pass
0156 
0157 
0158 def get_utility_command_execution_order(name):
0159     """
0160     Should the given utility command be executed before or after the payload?
0161 
0162     :param name: utility name (string).
0163     :return: execution order constant (UTILITY_BEFORE_PAYLOAD or UTILITY_AFTER_PAYLOAD_STARTED)
0164     """
0165 
0166     # example implementation
0167     if name == 'monitor':
0168         return UTILITY_BEFORE_PAYLOAD
0169     else:
0170         return UTILITY_AFTER_PAYLOAD_STARTED
0171 
0172 
0173 def post_utility_command_action(name, job):
0174     """
0175     Perform post action for given utility command.
0176 
0177     :param name: name of utility command (string).
0178     :param job: job object.
0179     :return:
0180     """
0181 
0182     pass
0183 
0184 
0185 def get_utility_command_kill_signal(name):
0186     """
0187     Return the proper kill signal used to stop the utility command.
0188 
0189     :param name:
0190     :return: kill signal
0191     """
0192 
0193     return SIGTERM
0194 
0195 
0196 def get_utility_command_output_filename(name, selector=None):
0197     """
0198     Return the filename to the output of the utility command.
0199 
0200     :param name: utility name (string).
0201     :param selector: optional special conditions flag (boolean).
0202     :return: filename (string).
0203     """
0204 
0205     return ""
0206 
0207 
0208 def verify_job(job):
0209     """
0210     Verify job parameters for specific errors.
0211     Note:
0212       in case of problem, the function should set the corresponding pilot error code using
0213       job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
0214 
0215     :param job: job object
0216     :return: Boolean.
0217     """
0218 
0219     return True
0220 
0221 
0222 def update_stagein(job):
0223     """
0224     In case special files need to be skipped during stage-in, the job.indata list can be updated here.
0225     See ATLAS code for an example.
0226 
0227     :param job: job object.
0228     :return:
0229     """
0230 
0231     pass
0232 
0233 
0234 def get_metadata(workdir):
0235     """
0236     Return the metadata from file.
0237 
0238     :param workdir: work directory (string)
0239     :return:
0240     """
0241 
0242     path = os.path.join(workdir, config.Payload.jobreport)
0243     metadata = read_file(path) if os.path.exists(path) else None
0244 
0245     return metadata
0246 
0247 
0248 def update_server(job):
0249     """
0250     Perform any user specific server actions.
0251 
0252     E.g. this can be used to send special information to a logstash.
0253 
0254     :param job: job object.
0255     :return:
0256     """
0257 
0258     pass
0259 
0260 
0261 def post_prestagein_utility_command(**kwargs):
0262     """
0263     Execute any post pre-stage-in utility commands.
0264 
0265     :param kwargs: kwargs (dictionary).
0266     :return:
0267     """
0268 
0269     # label = kwargs.get('label', 'unknown_label')
0270     # stdout = kwargs.get('output', None)
0271 
0272     pass
0273 
0274 
0275 def process_debug_command(debug_command, pandaid):
0276     """
0277     In debug mode, the server can send a special debug command to the pilot via the updateJob backchannel.
0278     This function can be used to process that command, i.e. to identify a proper pid to debug (which is unknown
0279     to the server).
0280 
0281     :param debug_command: debug command (string), payload pid (int).
0282     :param pandaid: PanDA id (string).
0283     :return: updated debug command (string)
0284     """
0285 
0286     return debug_command
0287 
0288 
0289 def allow_timefloor(submitmode):
0290     """
0291     Should the timefloor mechanism (multi-jobs) be allowed for the given submit mode?
0292 
0293     :param submitmode: submit mode (string).
0294     """
0295 
0296     return True