Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:05

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0009 
0010 import os
0011 from signal import SIGTERM
0012 
0013 from pilot.common.exception import TrfDownloadFailure
0014 from pilot.util.config import config
0015 from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED
0016 from pilot.util.filehandling import read_file
0017 from .setup import get_analysis_trf
0018 
0019 import logging
0020 logger = logging.getLogger(__name__)
0021 
0022 
0023 def sanity_check():
0024     """
0025     Perform an initial sanity check before doing anything else in a given workflow.
0026     This function can be used to verify importing of modules that are otherwise used much later, but it is better to abort
0027     the pilot if a problem is discovered early.
0028 
0029     :return: exit code (0 if all is ok, otherwise non-zero exit code).
0030     """
0031 
0032     return 0
0033 
0034 
0035 def validate(job):
0036     """
0037     Perform user specific payload/job validation.
0038 
0039     :param job: job object.
0040     :return: Boolean (True if validation is successful).
0041     """
0042 
0043     return True
0044 
0045 
0046 def get_payload_command(job):
0047     """
0048     Return the full command for executing the payload, including the sourcing of all setup files and setting of
0049     environment variables.
0050 
0051     By default, the full payload command is assumed to be in the job.jobparams.
0052 
0053     :param job: job object
0054     :return: command (string)
0055     """
0056 
0057     # Try to download the trf
0058     # if job.imagename != "" or "--containerImage" in job.jobparams:
0059     #    job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer")
0060     #    logger.warning('overwrote job.transformation, now set to: %s' % job.transformation)
0061     ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0062     if ec != 0:
0063         raise TrfDownloadFailure(diagnostics)
0064     else:
0065         logger.debug('user analysis trf: %s' % trf_name)
0066 
0067     return get_analysis_run_command(job, trf_name)
0068 
0069 
0070 def get_analysis_run_command(job, trf_name):
0071     """
0072     Return the proper run command for the user job.
0073 
0074     Example output: export X509_USER_PROXY=<..>;./runAthena <job parameters> --usePFCTurl --directIn
0075 
0076     :param job: job object.
0077     :param trf_name: name of the transform that will run the job (string). Used when containers are not used.
0078     :return: command (string).
0079     """
0080 
0081     cmd = ""
0082 
0083     # add the user proxy
0084     if 'X509_USER_PROXY' in os.environ and not job.imagename:
0085         cmd += 'export X509_USER_PROXY=%s;' % os.environ.get('X509_USER_PROXY')
0086 
0087     # set up trfs
0088     if job.imagename == "":  # user jobs with no imagename defined
0089         cmd += './%s %s' % (trf_name, job.jobparams)
0090     else:
0091         if trf_name:
0092             cmd += './%s %s' % (trf_name, job.jobparams)
0093         else:
0094             cmd += 'python %s %s' % (trf_name, job.jobparams)
0095 
0096     return cmd
0097 
0098 
0099 def update_job_data(job):
0100     """
0101     This function can be used to update/add data to the job object.
0102     E.g. user specific information can be extracted from other job object fields. In the case of ATLAS, information
0103     is extracted from the metaData field and added to other job object fields.
0104 
0105     :param job: job object
0106     :return:
0107     """
0108 
0109     pass
0110 
0111 
0112 def remove_redundant_files(workdir, outputfiles=[], islooping=False, debugmode=False):
0113     """
0114     Remove redundant files and directories prior to creating the log file.
0115 
0116     :param workdir: working directory (string).
0117     :param outputfiles: list of output files.
0118     :param islooping: looping job variable to make sure workDir is not removed in case of looping (boolean).
0119     :param debugmode: True if debug mode has been switched on (Boolean).
0120     :return:
0121     """
0122 
0123     pass
0124 
0125 
0126 def get_utility_commands(order=None, job=None):
0127     """
0128     Return a dictionary of utility commands and arguments to be executed in parallel with the payload.
0129     This could e.g. be memory and network monitor commands. A separate function can be used to determine the
0130     corresponding command setups using the utility command name.
0131     If the optional order parameter is set, the function should return the list of corresponding commands.
0132     E.g. if order=UTILITY_BEFORE_PAYLOAD, the function should return all commands that are to be executed before the
0133     payload. If order=UTILITY_WITH_PAYLOAD, the corresponding commands will be prepended to the payload execution
0134     string. If order=UTILITY_AFTER_PAYLOAD_STARTED, the commands that should be executed after the payload has been started
0135     should be returned.
0136 
0137     FORMAT: {'command': <command>, 'args': <args>}
0138 
0139     :param order: optional sorting order (see pilot.util.constants)
0140     :param job: optional job object.
0141     :return: dictionary of utilities to be executed in parallel with the payload.
0142     """
0143 
0144     return {}
0145 
0146 
0147 def get_utility_command_setup(name, setup=None):
0148     """
0149     Return the proper setup for the given utility command.
0150     If a payload setup is specified
0151     :param name:
0152     :param setup:
0153     :return:
0154     """
0155 
0156     pass
0157 
0158 
0159 def get_utility_command_execution_order(name):
0160     """
0161     Should the given utility command be executed before or after the payload?
0162 
0163     :param name: utility name (string).
0164     :return: execution order constant (UTILITY_BEFORE_PAYLOAD or UTILITY_AFTER_PAYLOAD_STARTED)
0165     """
0166 
0167     # example implementation
0168     if name == 'monitor':
0169         return UTILITY_BEFORE_PAYLOAD
0170     else:
0171         return UTILITY_AFTER_PAYLOAD_STARTED
0172 
0173 
0174 def post_utility_command_action(name, job):
0175     """
0176     Perform post action for given utility command.
0177 
0178     :param name: name of utility command (string).
0179     :param job: job object.
0180     :return:
0181     """
0182 
0183     pass
0184 
0185 
0186 def get_utility_command_kill_signal(name):
0187     """
0188     Return the proper kill signal used to stop the utility command.
0189 
0190     :param name:
0191     :return: kill signal
0192     """
0193 
0194     return SIGTERM
0195 
0196 
0197 def get_utility_command_output_filename(name, selector=None):
0198     """
0199     Return the filename to the output of the utility command.
0200 
0201     :param name: utility name (string).
0202     :param selector: optional special conditions flag (boolean).
0203     :return: filename (string).
0204     """
0205 
0206     return ""
0207 
0208 
0209 def verify_job(job):
0210     """
0211     Verify job parameters for specific errors.
0212     Note:
0213       in case of problem, the function should set the corresponding pilot error code using
0214       job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
0215 
0216     :param job: job object
0217     :return: Boolean.
0218     """
0219 
0220     return True
0221 
0222 
0223 def update_stagein(job):
0224     """
0225     In case special files need to be skipped during stage-in, the job.indata list can be updated here.
0226     See ATLAS code for an example.
0227 
0228     :param job: job object.
0229     :return:
0230     """
0231 
0232     pass
0233 
0234 
0235 def get_metadata(workdir):
0236     """
0237     Return the metadata from file.
0238 
0239     :param workdir: work directory (string)
0240     :return:
0241     """
0242 
0243     path = os.path.join(workdir, config.Payload.jobreport)
0244     metadata = read_file(path) if os.path.exists(path) else None
0245 
0246     return metadata
0247 
0248 
0249 def update_server(job):
0250     """
0251     Perform any user specific server actions.
0252 
0253     E.g. this can be used to send special information to a logstash.
0254 
0255     :param job: job object.
0256     :return:
0257     """
0258 
0259     pass
0260 
0261 
0262 def post_prestagein_utility_command(**kwargs):
0263     """
0264     Execute any post pre-stage-in utility commands.
0265 
0266     :param kwargs: kwargs (dictionary).
0267     :return:
0268     """
0269 
0270     # label = kwargs.get('label', 'unknown_label')
0271     # stdout = kwargs.get('output', None)
0272 
0273     pass
0274 
0275 
0276 def process_debug_command(debug_command, pandaid):
0277     """
0278     In debug mode, the server can send a special debug command to the pilot via the updateJob backchannel.
0279     This function can be used to process that command, i.e. to identify a proper pid to debug (which is unknown
0280     to the server).
0281 
0282     :param debug_command: debug command (string), payload pid (int).
0283     :param pandaid: PanDA id (string).
0284     :return: updated debug command (string)
0285     """
0286 
0287     return debug_command
0288 
0289 
0290 def allow_timefloor(submitmode):
0291     """
0292     Should the timefloor mechanism (multi-jobs) be allowed for the given submit mode?
0293 
0294     :param submitmode: submit mode (string).
0295     """
0296 
0297     allow = True
0298     if submitmode.lower() == 'push':
0299         logger.info('Since the submitmode=push, override timefloor with zero manually')
0300         allow = False
0301 
0302     return allow