File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 from pilot.common.errorcodes import ErrorCodes
0011 from pilot.util.auxiliary import whoami, set_pilot_state, cut_output, locate_core_file
0012 from pilot.util.config import config
0013 from pilot.util.container import execute
0014 from pilot.util.filehandling import remove_files, find_latest_modified_file, verify_file_list, copy
0015 from pilot.util.parameters import convert_to_int
0016 from pilot.util.processes import kill_processes
0017 from pilot.util.timing import time_stamp
0018
0019 import os
0020 import time
0021 import logging
0022 logger = logging.getLogger(__name__)
0023
0024 errors = ErrorCodes()
0025
0026
0027 def looping_job(job, mt):
0028 """
0029 Looping job detection algorithm.
0030 Identify hanging tasks/processes. Did the stage-in/out finish within allowed time limit, or did the payload update
0031 any files recently? The files must have been touched within the given looping_limit, or the process will be
0032 terminated.
0033
0034 :param job: job object.
0035 :param mt: `MonitoringTime` object.
0036 :return: exit code (int), diagnostics (string).
0037 """
0038
0039 exit_code = 0
0040 diagnostics = ""
0041
0042 logger.info('checking for looping job')
0043
0044 looping_limit = get_looping_job_limit()
0045
0046 if job.state == 'stagein':
0047
0048 pass
0049 elif job.state == 'stageout':
0050
0051 pass
0052 else:
0053
0054
0055
0056
0057 time_last_touched = get_time_for_last_touch(job, mt, looping_limit)
0058
0059
0060 if time_last_touched:
0061 ct = int(time.time())
0062 logger.info('current time: %d', ct)
0063 logger.info('last time files were touched: %d', time_last_touched)
0064 logger.info('looping limit: %d s', looping_limit)
0065 if ct - time_last_touched > looping_limit:
0066 try:
0067
0068 create_core_dump(pid=job.pid, workdir=job.workdir)
0069
0070 job.debug = True
0071 kill_looping_job(job)
0072 except Exception as error:
0073 logger.warning('exception caught: %s', error)
0074 else:
0075 logger.info('no files were touched')
0076
0077 return exit_code, diagnostics
0078
0079
0080 def create_core_dump(pid=None, workdir=None):
0081 """
0082 Create core dump and copy it to work directory
0083 """
0084
0085 if not pid or not workdir:
0086 logger.warning('cannot create core file since pid or workdir is unknown')
0087 return
0088
0089 cmd = 'gdb --pid %d -ex \'generate-core-file\'' % pid
0090 exit_code, stdout, stderr = execute(cmd)
0091 if not exit_code:
0092 path = locate_core_file(pid=pid)
0093 if path:
0094 try:
0095 copy(path, workdir)
0096 except Exception as error:
0097 logger.warning('failed to copy core file: %s', error)
0098 else:
0099 logger.debug('copied core dump to workdir')
0100
0101 else:
0102 logger.warning('failed to execute command: %s, stdout+err=%s', cmd, stdout + stderr)
0103
0104
0105 def get_time_for_last_touch(job, mt, looping_limit):
0106 """
0107 Return the time when the files in the workdir were last touched.
0108 in case no file was touched since the last check, the returned value will be the same as the previous time.
0109
0110 :param job: job object.
0111 :param mt: `MonitoringTime` object.
0112 :param looping_limit: looping limit in seconds.
0113 :return: time in seconds since epoch (int) (or None in case of failure).
0114 """
0115
0116 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0117 loopingjob_definitions = __import__('pilot.user.%s.loopingjob_definitions' % pilot_user,
0118 globals(), locals(), [pilot_user], 0)
0119
0120
0121 cmd = "find %s -type f -mmin -%d" % (job.workdir, int(looping_limit / 60))
0122 exit_code, stdout, stderr = execute(cmd)
0123 if exit_code == 0:
0124 if stdout != "":
0125 files = stdout.split("\n")
0126
0127
0128 files = loopingjob_definitions.remove_unwanted_files(job.workdir, files)
0129 if files:
0130 logger.info('found %d files that were recently updated', len(files))
0131
0132 updated_files = verify_file_list(files)
0133
0134
0135 latest_modified_file, mtime = find_latest_modified_file(updated_files)
0136 if latest_modified_file:
0137 logger.info("file %s is the most recently updated file (at time=%d)", latest_modified_file, mtime)
0138 else:
0139 logger.warning('looping job algorithm failed to identify latest updated file')
0140 return mt.ct_looping_last_touched
0141
0142
0143 mt.update('ct_looping_last_touched', modtime=mtime)
0144 else:
0145 logger.warning("found no recently updated files!")
0146 else:
0147 logger.warning('found no recently updated files')
0148 else:
0149
0150 stdout = cut_output(stdout)
0151 stderr = cut_output(stderr)
0152 logger.warning('find command failed: %d, %s, %s', exit_code, stdout, stderr)
0153
0154 return mt.ct_looping_last_touched
0155
0156
0157 def kill_looping_job(job):
0158 """
0159 Kill the looping process.
0160 TODO: add allow_looping_job() exp. spec?
0161
0162 :param job: job object.
0163 :return: (updated job object.)
0164 """
0165
0166
0167 diagnostics = "pilot has decided to kill looping job %s at %s" % (job.jobid, time_stamp())
0168 logger.fatal(diagnostics)
0169
0170 cmd = 'ps -fwu %s' % whoami()
0171 exit_code, stdout, stderr = execute(cmd, mute=True)
0172 logger.info("%s: %s", cmd + '\n', stdout)
0173
0174 cmd = 'ls -ltr %s' % (job.workdir)
0175 exit_code, stdout, stderr = execute(cmd, mute=True)
0176 logger.info("%s: %s", cmd + '\n', stdout)
0177
0178 cmd = 'ps -o pid,ppid,sid,pgid,tpgid,stat,comm -u %s' % whoami()
0179 exit_code, stdout, stderr = execute(cmd, mute=True)
0180 logger.info("%s: %s", cmd + '\n', stdout)
0181
0182 cmd = 'pstree -g -a'
0183 exit_code, stdout, stderr = execute(cmd, mute=True)
0184 logger.info("%s: %s", cmd + '\n', stdout)
0185
0186
0187 if job.state == 'stagein':
0188 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEINTIMEOUT)
0189 elif job.state == 'stageout':
0190 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.STAGEOUTTIMEOUT)
0191 else:
0192
0193 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOOPINGJOB)
0194 set_pilot_state(job=job, state="failed")
0195
0196
0197 lfns, guids = job.get_lfns_and_guids()
0198 if lfns:
0199 ec = remove_files(job.workdir, lfns)
0200 if ec != 0:
0201 logger.warning('failed to remove all files')
0202
0203 kill_processes(job.pid)
0204
0205
0206 def get_looping_job_limit():
0207 """
0208 Get the time limit for looping job detection.
0209
0210 :return: looping job time limit in seconds (int).
0211 """
0212
0213 looping_limit = convert_to_int(config.Pilot.looping_limit_default, default=2 * 3600)
0214 looping_limit_min_default = convert_to_int(config.Pilot.looping_limit_min_default, default=2 * 3600)
0215 looping_limit = max(looping_limit, looping_limit_min_default)
0216 logger.info("using looping job limit: %d s", looping_limit)
0217
0218 return looping_limit