Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0009 
0010 from pilot.common.errorcodes import ErrorCodes
0011 from pilot.util.auxiliary import whoami, set_pilot_state, cut_output, locate_core_file
0012 from pilot.util.config import config
0013 from pilot.util.container import execute
0014 from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy
0015 from pilot.util.parameters import convert_to_int
0016 from pilot.util.processes import kill_processes
0017 from pilot.util.timing import time_stamp
0018 
0019 import os
0020 import time
0021 import logging
0022 logger = logging.getLogger(__name__)
0023 
0024 errors = ErrorCodes()
0025 
0026 
0027 def looping_job(job, mt):
0028     """
0029     Looping job detection algorithm.
0030     Identify hanging tasks/processes. Did the stage-in/out finish within allowed time limit, or did the payload update
0031     any files recently? The files must have been touched within the given looping_limit, or the process will be
0032     terminated.
0033 
0034     :param job: job object.
0035     :param mt: `MonitoringTime` object.
0036     :return: exit code (int), diagnostics (string).
0037     """
0038 
0039     exit_code = 0
0040     diagnostics = ""
0041 
0042     logger.info('checking for looping job')
0043 
0044     looping_limit = get_looping_job_limit()
0045 
0046     if job.state == 'stagein':
0047         # set job.state to stagein during stage-in before implementing this algorithm
0048         pass
0049     elif job.state == 'stageout':
0050         # set job.state to stageout during stage-out before implementing this algorithm
0051         pass
0052     else:
0053         # most likely in the 'running' state, but use the catch-all 'else'
0054 
0055         # get the time when the files in the workdir were last touched. in case no file was touched since the last
0056         # check, the returned value will be the same as the previous time
0057         time_last_touched = get_time_for_last_touch(job, mt, looping_limit)
0058 
0059         # the payload process is considered to be looping if it's files have not been touched within looping_limit time
0060         if time_last_touched:
0061             ct = int(time.time())
0062             logger.info('current time: %d', ct)
0063             logger.info('last time files were touched: %d', time_last_touched)
0064             logger.info('looping limit: %d s', looping_limit)
0065             if ct - time_last_touched > looping_limit:
0066                 try:
0067                     # first produce core dump and copy it
0068                     create_core_dump(pid=job.pid, workdir=job.workdir)
0069                     # set debug mode to prevent core file from being removed before log creation
0070                     job.debug = True
0071                     kill_looping_job(job)
0072                 except Exception as error:
0073                     logger.warning('exception caught: %s', error)
0074         else:
0075             logger.info('no files were touched')
0076 
0077     return exit_code, diagnostics
0078 
0079 
0080 def create_core_dump(pid=None, workdir=None):
0081     """
0082     Create core dump and copy it to work directory
0083     """
0084 
0085     if not pid or not workdir:
0086         logger.warning('cannot create core file since pid or workdir is unknown')
0087         return
0088 
0089     cmd = 'gdb --pid %d -ex \'generate-core-file\'' % pid
0090     exit_code, stdout, stderr = execute(cmd)
0091     if not exit_code:
0092         path = locate_core_file(pid=pid)
0093         if path:
0094             try:
0095                 copy(path, workdir)
0096             except Exception as error:
0097                 logger.warning('failed to copy core file: %s', error)
0098             else:
0099                 logger.debug('copied core dump to workdir')
0100 
0101     else:
0102         logger.warning('failed to execute command: %s, stdout+err=%s', cmd, stdout + stderr)
0103 
0104 
0105 def get_time_for_last_touch(job, mt, looping_limit):
0106     """
0107     Return the time when the files in the workdir were last touched.
0108     in case no file was touched since the last check, the returned value will be the same as the previous time.
0109 
0110     :param job: job object.
0111     :param mt: `MonitoringTime` object.
0112     :param looping_limit: looping limit in seconds.
0113     :return: time in seconds since epoch (int) (or None in case of failure).
0114     """
0115 
0116     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0117     loopingjob_definitions = __import__('pilot.user.%s.loopingjob_definitions' % pilot_user,
0118                                         globals(), locals(), [pilot_user], 0)  # Python 2/3
0119 
0120     # locate all files that were modified the last N minutes
0121     cmd = "find %s -type f -mmin -%d" % (job.workdir, int(looping_limit / 60))
0122     exit_code, stdout, stderr = execute(cmd)
0123     if exit_code == 0:
0124         if stdout != "":
0125             files = stdout.split("\n")  # find might add a \n even for single entries
0126 
0127             # remove unwanted list items (*.py, *.pyc, workdir, ...)
0128             files = loopingjob_definitions.remove_unwanted_files(job.workdir, files)
0129             if files:
0130                 logger.info('found %d files that were recently updated', len(files))
0131                 #logger.debug('recent files:\n%s', files)
0132                 updated_files = verify_file_list(files)
0133 
0134                 # now get the mod times for these file, and identify the most recently update file
0135                 latest_modified_file, mtime = find_latest_modified_file(updated_files)
0136                 if latest_modified_file:
0137                     logger.info("file %s is the most recently updated file (at time=%d)", latest_modified_file, mtime)
0138                 else:
0139                     logger.warning('looping job algorithm failed to identify latest updated file')
0140                     return mt.ct_looping_last_touched
0141 
0142                 # store the time of the last file modification
0143                 mt.update('ct_looping_last_touched', modtime=mtime)
0144             else:
0145                 logger.warning("found no recently updated files!")
0146         else:
0147             logger.warning('found no recently updated files')
0148     else:
0149         # cut the output if too long
0150         stdout = cut_output(stdout)
0151         stderr = cut_output(stderr)
0152         logger.warning('find command failed: %d, %s, %s', exit_code, stdout, stderr)
0153 
0154     return mt.ct_looping_last_touched
0155 
0156 
0157 def kill_looping_job(job):
0158     """
0159     Kill the looping process.
0160     TODO: add allow_looping_job() exp. spec?
0161 
0162     :param job: job object.
0163     :return: (updated job object.)
0164     """
0165 
0166     # the child process is looping, kill it
0167     diagnostics = "pilot has decided to kill looping job %s at %s" % (job.jobid, time_stamp())
0168     logger.fatal(diagnostics)
0169 
0170     cmd = 'ps -fwu %s' % whoami()
0171     exit_code, stdout, stderr = execute(cmd, mute=True)
0172     logger.info("%s: %s", cmd + '\n', stdout)
0173 
0174     cmd = 'ls -ltr %s' % (job.workdir)
0175     exit_code, stdout, stderr = execute(cmd, mute=True)
0176     logger.info("%s: %s", cmd + '\n', stdout)
0177 
0178     cmd = 'ps -o pid,ppid,sid,pgid,tpgid,stat,comm -u %s' % whoami()
0179     exit_code, stdout, stderr = execute(cmd, mute=True)
0180     logger.info("%s: %s", cmd + '\n', stdout)
0181 
0182     cmd = 'pstree -g -a'
0183     exit_code, stdout, stderr = execute(cmd, mute=True)
0184     logger.info("%s: %s", cmd + '\n', stdout)
0185 
0186     # set the relevant error code
0187     if job.state == 'stagein':
0188         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT)
0189     elif job.state == 'stageout':
0190         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT)
0191     else:
0192         # most likely in the 'running' state, but use the catch-all 'else'
0193         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB)
0194     set_pilot_state(job=job, state="failed")
0195 
0196     # remove any lingering input files from the work dir
0197     lfns, guids = job.get_lfns_and_guids()
0198     if lfns:
0199         ec = remove_files(job.workdir, lfns)
0200         if ec != 0:
0201             logger.warning('failed to remove all files')
0202 
0203     kill_processes(job.pid)
0204 
0205 
0206 def get_looping_job_limit():
0207     """
0208     Get the time limit for looping job detection.
0209 
0210     :return: looping job time limit in seconds (int).
0211     """
0212 
0213     looping_limit = convert_to_int(config.Pilot.looping_limit_default, default=2 * 3600)
0214     looping_limit_min_default = convert_to_int(config.Pilot.looping_limit_min_default, default=2 * 3600)
0215     looping_limit = max(looping_limit, looping_limit_min_default)
0216     logger.info("using looping job limit: %d s", looping_limit)
0217 
0218     return looping_limit