File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 import logging
0015 import threading
0016 import time
0017 import re
0018 from os import environ, getpid, getuid
0019 from subprocess import Popen, PIPE
0020
0021 from pilot.common.exception import PilotException, ExceededMaxWaitTime
0022 from pilot.util.auxiliary import check_for_final_server_update
0023 from pilot.util.config import config
0024 from pilot.util.constants import MAX_KILL_WAIT_TIME
0025
0026 from pilot.util.queuehandling import get_queuedata_from_job, abort_jobs_in_queues
0027 from pilot.util.timing import get_time_since_start
0028
0029 logger = logging.getLogger(__name__)
0030
0031
0032
0033
0034 def control(queues, traces, args):
0035 """
0036 Main control function, run from the relevant workflow module.
0037
0038 :param queues:
0039 :param traces:
0040 :param args:
0041 :return:
0042 """
0043
0044 t_0 = time.time()
0045 traces.pilot['lifetime_start'] = t_0
0046 traces.pilot['lifetime_max'] = t_0
0047
0048 threadchecktime = int(config.Pilot.thread_check)
0049
0050
0051 cpuchecktime = int(config.Pilot.cpu_check)
0052 tcpu = t_0
0053
0054 queuedata = get_queuedata_from_job(queues)
0055 max_running_time = get_max_running_time(args.lifetime, queuedata)
0056
0057 try:
0058
0059 niter = 0
0060
0061 while not args.graceful_stop.is_set():
0062
0063 if args.graceful_stop.wait(1) or args.graceful_stop.is_set():
0064 logger.warning('aborting monitor loop since graceful_stop has been set')
0065 break
0066
0067
0068 if args.kill_time and int(time.time()) - args.kill_time > MAX_KILL_WAIT_TIME:
0069 logger.warning('loop has run for too long time - will abort')
0070 args.graceful_stop.set()
0071 break
0072
0073
0074 time_since_start = get_time_since_start(args)
0075 grace_time = 10 * 60
0076 if time_since_start - grace_time > max_running_time:
0077 logger.fatal('max running time (%d s) minus grace time (%d s) has been exceeded - must abort pilot', max_running_time, grace_time)
0078 logger.info('setting REACHED_MAXTIME and graceful stop')
0079 environ['REACHED_MAXTIME'] = 'REACHED_MAXTIME'
0080
0081
0082 check_for_final_server_update(args.update_server)
0083 args.graceful_stop.set()
0084 break
0085 else:
0086 if niter % 60 == 0:
0087 logger.info('%d s have passed since pilot start', time_since_start)
0088 time.sleep(1)
0089
0090
0091 if int(time.time() - tcpu) > cpuchecktime and False:
0092 processes = get_process_info('python pilot2/pilot.py', pid=getpid())
0093 if processes:
0094 logger.info('-' * 100)
0095 logger.info('PID=%d has CPU usage=%s%% MEM usage=%s%% CMD=%s', getpid(), processes[0], processes[1], processes[2])
0096 nproc = processes[3]
0097 if nproc > 1:
0098 logger.info('there are %d such processes running', nproc)
0099 else:
0100 logger.info('there is %d such process running', nproc)
0101 logger.info('-' * 100)
0102 tcpu = time.time()
0103
0104
0105 run_checks(queues, args)
0106
0107
0108 if int(time.time() - traces.pilot['lifetime_start']) % threadchecktime == 0:
0109
0110 for thread in threading.enumerate():
0111
0112 if not thread.is_alive():
0113 logger.fatal('thread \'%s\' is not alive', thread.name)
0114
0115
0116 niter += 1
0117
0118 except Exception as error:
0119 print(("monitor: exception caught: %s" % error))
0120 raise PilotException(error)
0121
0122 logger.info('[monitor] control thread has ended')
0123
0124
0125
0126
0127
0128 def get_process_info(cmd, user=None, args='aufx', pid=None):
0129 """
0130 Return process info for given command.
0131 The function returns a list with format [cpu, mem, command, number of commands] as returned by 'ps -u user args' for
0132 a given command (e.g. python pilot2/pilot.py).
0133
0134 Example
0135 get_processes_for_command('sshd:')
0136
0137 nilspal 1362 0.0 0.0 183424 2528 ? S 12:39 0:00 sshd: nilspal@pts/28
0138 nilspal 1363 0.0 0.0 136628 2640 pts/28 Ss 12:39 0:00 _ -tcsh
0139 nilspal 8603 0.0 0.0 34692 5072 pts/28 S+ 12:44 0:00 _ python monitor.py
0140 nilspal 8604 0.0 0.0 62036 1776 pts/28 R+ 12:44 0:00 _ ps -u nilspal aufx --no-headers
0141
0142 -> ['0.0', '0.0', 'sshd: nilspal@pts/28', 1]
0143
0144 :param cmd: command (string).
0145 :param user: user (string).
0146 :param args: ps arguments (string).
0147 :param pid: process id (int).
0148 :return: list with process info (l[0]=cpu usage(%), l[1]=mem usage(%), l[2]=command(string)).
0149 """
0150
0151 processes = []
0152 num = 0
0153 if not user:
0154 user = getuid()
0155 pattern = re.compile(r"\S+|[-+]?\d*\.\d+|\d+")
0156 arguments = ['ps', '-u', user, args, '--no-headers']
0157
0158 process = Popen(arguments, stdout=PIPE, stderr=PIPE)
0159 stdout, _ = process.communicate()
0160 for line in stdout.splitlines():
0161 found = re.findall(pattern, line)
0162 if found is not None:
0163 processid = found[1]
0164 cpu = found[2]
0165 mem = found[3]
0166 command = ' '.join(found[10:])
0167 if cmd in command:
0168 num += 1
0169 if processid == str(pid):
0170 processes = [cpu, mem, command]
0171
0172 if processes:
0173 processes.append(num)
0174
0175 return processes
0176
0177
0178 def run_checks(queues, args):
0179 """
0180 Perform non-job related monitoring checks.
0181
0182 :param queues:
0183 :param args:
0184 :return:
0185 """
0186
0187
0188
0189 if args.abort_job.is_set():
0190
0191 abort_jobs_in_queues(queues, args.signal)
0192
0193 t_max = 2 * 60
0194 logger.warning('pilot monitor received instruction that abort_job has been requested')
0195 logger.warning('will wait for a maximum of %d seconds for threads to finish', t_max)
0196 t_0 = time.time()
0197 while time.time() - t_0 < t_max:
0198 if args.job_aborted.is_set():
0199 logger.warning('job_aborted has been set - aborting pilot monitoring')
0200 args.abort_job.clear()
0201 return
0202 time.sleep(1)
0203
0204 if args.graceful_stop.is_set():
0205 logger.info('graceful_stop already set')
0206 else:
0207 logger.warning('setting graceful_stop')
0208 args.graceful_stop.set()
0209
0210 if not args.job_aborted.is_set():
0211 logger.warning('will wait for a maximum of %d seconds for graceful_stop to take effect', t_max)
0212 t_max = 10
0213 t_0 = time.time()
0214 while time.time() - t_0 < t_max:
0215 if args.job_aborted.is_set():
0216 logger.warning('job_aborted has been set - aborting pilot monitoring')
0217 args.abort_job.clear()
0218 return
0219 time.sleep(1)
0220
0221 diagnostics = 'reached maximum waiting time - threads should have finished'
0222 args.abort_job.clear()
0223 args.job_aborted.set()
0224 raise ExceededMaxWaitTime(diagnostics)
0225
0226
0227 def get_max_running_time(lifetime, queuedata):
0228 """
0229 Return the maximum allowed running time for the pilot.
0230 The max time is set either as a pilot option or via the schedconfig.maxtime for the PQ in question.
0231
0232 :param lifetime: optional pilot option time in seconds (int).
0233 :param queuedata: queuedata object
0234 :return: max running time in seconds (int).
0235 """
0236
0237 max_running_time = lifetime
0238
0239
0240 if not queuedata:
0241 logger.warning('queuedata could not be extracted from queues, will use default for max running time '
0242 '(%d s)', max_running_time)
0243 else:
0244 if queuedata.maxtime:
0245 try:
0246 max_running_time = int(queuedata.maxtime)
0247 except Exception as error:
0248 logger.warning('exception caught: %s', error)
0249 logger.warning('failed to convert maxtime from queuedata, will use default value for max running time '
0250 '(%d s)', max_running_time)
0251 else:
0252 if max_running_time == 0:
0253 max_running_time = lifetime
0254 logger.info('will use default value for max running time: %d s', max_running_time)
0255 else:
0256 logger.info('will use queuedata.maxtime value for max running time: %d s', max_running_time)
0257
0258 return max_running_time