Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0010 
0011 # NOTE: this module should deal with non-job related monitoring, such as thread monitoring. Job monitoring is
0012 #       a task for the job_monitor thread in the Job component.
0013 
0014 import logging
0015 import threading
0016 import time
0017 import re
0018 from os import environ, getpid, getuid
0019 from subprocess import Popen, PIPE
0020 
0021 from pilot.common.exception import PilotException, ExceededMaxWaitTime
0022 from pilot.util.auxiliary import check_for_final_server_update
0023 from pilot.util.config import config
0024 from pilot.util.constants import MAX_KILL_WAIT_TIME
0025 # from pilot.util.container import execute
0026 from pilot.util.queuehandling import get_queuedata_from_job, abort_jobs_in_queues
0027 from pilot.util.timing import get_time_since_start
0028 
0029 logger = logging.getLogger(__name__)
0030 
0031 
0032 # Monitoring of threads functions
0033 
0034 def control(queues, traces, args):
0035     """
0036     Main control function, run from the relevant workflow module.
0037 
0038     :param queues:
0039     :param traces:
0040     :param args:
0041     :return:
0042     """
0043 
0044     t_0 = time.time()
0045     traces.pilot['lifetime_start'] = t_0  # ie referring to when pilot monitoring began
0046     traces.pilot['lifetime_max'] = t_0
0047 
0048     threadchecktime = int(config.Pilot.thread_check)
0049 
0050     # for CPU usage debugging
0051     cpuchecktime = int(config.Pilot.cpu_check)
0052     tcpu = t_0
0053 
0054     queuedata = get_queuedata_from_job(queues)
0055     max_running_time = get_max_running_time(args.lifetime, queuedata)
0056 
0057     try:
0058         # overall loop counter (ignoring the fact that more than one job may be running)
0059         niter = 0
0060 
0061         while not args.graceful_stop.is_set():
0062             # every seconds, run the monitoring checks
0063             if args.graceful_stop.wait(1) or args.graceful_stop.is_set():  # 'or' added for 2.6 compatibility
0064                 logger.warning('aborting monitor loop since graceful_stop has been set')
0065                 break
0066 
0067             # abort if kill signal arrived too long time ago, ie loop is stuck
0068             if args.kill_time and int(time.time()) - args.kill_time > MAX_KILL_WAIT_TIME:
0069                 logger.warning('loop has run for too long time - will abort')
0070                 args.graceful_stop.set()
0071                 break
0072 
0073             # check if the pilot has run out of time (stop ten minutes before PQ limit)
0074             time_since_start = get_time_since_start(args)
0075             grace_time = 10 * 60
0076             if time_since_start - grace_time > max_running_time:
0077                 logger.fatal('max running time (%d s) minus grace time (%d s) has been exceeded - must abort pilot', max_running_time, grace_time)
0078                 logger.info('setting REACHED_MAXTIME and graceful stop')
0079                 environ['REACHED_MAXTIME'] = 'REACHED_MAXTIME'  # TODO: use singleton instead
0080                 # do not set graceful stop if pilot has not finished sending the final job update
0081                 # i.e. wait until SERVER_UPDATE is FINAL_DONE
0082                 check_for_final_server_update(args.update_server)
0083                 args.graceful_stop.set()
0084                 break
0085             else:
0086                 if niter % 60 == 0:
0087                     logger.info('%d s have passed since pilot start', time_since_start)
0088             time.sleep(1)
0089 
0090             # time to check the CPU?
0091             if int(time.time() - tcpu) > cpuchecktime and False:  # for testing only
0092                 processes = get_process_info('python pilot2/pilot.py', pid=getpid())
0093                 if processes:
0094                     logger.info('-' * 100)
0095                     logger.info('PID=%d has CPU usage=%s%% MEM usage=%s%% CMD=%s', getpid(), processes[0], processes[1], processes[2])
0096                     nproc = processes[3]
0097                     if nproc > 1:
0098                         logger.info('there are %d such processes running', nproc)
0099                     else:
0100                         logger.info('there is %d such process running', nproc)
0101                     logger.info('-' * 100)
0102                 tcpu = time.time()
0103 
0104             # proceed with running the other checks
0105             run_checks(queues, args)
0106 
0107             # thread monitoring
0108             if int(time.time() - traces.pilot['lifetime_start']) % threadchecktime == 0:
0109                 # get all threads
0110                 for thread in threading.enumerate():
0111                     # logger.info('thread name: %s', thread.name)
0112                     if not thread.is_alive():
0113                         logger.fatal('thread \'%s\' is not alive', thread.name)
0114                         # args.graceful_stop.set()
0115 
0116             niter += 1
0117 
0118     except Exception as error:
0119         print(("monitor: exception caught: %s" % error))
0120         raise PilotException(error)
0121 
0122     logger.info('[monitor] control thread has ended')
0123 
0124 #def log_lifetime(sig, frame, traces):
0125 #    logger.info('lifetime: %i used, %i maximum', int(time.time() - traces.pilot['lifetime_start']), traces.pilot['lifetime_max'])
0126 
0127 
0128 def get_process_info(cmd, user=None, args='aufx', pid=None):
0129     """
0130     Return process info for given command.
0131     The function returns a list with format [cpu, mem, command, number of commands] as returned by 'ps -u user args' for
0132     a given command (e.g. python pilot2/pilot.py).
0133 
0134     Example
0135       get_processes_for_command('sshd:')
0136 
0137       nilspal   1362  0.0  0.0 183424  2528 ?        S    12:39   0:00 sshd: nilspal@pts/28
0138       nilspal   1363  0.0  0.0 136628  2640 pts/28   Ss   12:39   0:00  _ -tcsh
0139       nilspal   8603  0.0  0.0  34692  5072 pts/28   S+   12:44   0:00      _ python monitor.py
0140       nilspal   8604  0.0  0.0  62036  1776 pts/28   R+   12:44   0:00          _ ps -u nilspal aufx --no-headers
0141 
0142       -> ['0.0', '0.0', 'sshd: nilspal@pts/28', 1]
0143 
0144     :param cmd: command (string).
0145     :param user: user (string).
0146     :param args: ps arguments (string).
0147     :param pid: process id (int).
0148     :return: list with process info (l[0]=cpu usage(%), l[1]=mem usage(%), l[2]=command(string)).
0149     """
0150 
0151     processes = []
0152     num = 0
0153     if not user:
0154         user = getuid()
0155     pattern = re.compile(r"\S+|[-+]?\d*\.\d+|\d+")
0156     arguments = ['ps', '-u', user, args, '--no-headers']
0157 
0158     process = Popen(arguments, stdout=PIPE, stderr=PIPE)
0159     stdout, _ = process.communicate()
0160     for line in stdout.splitlines():
0161         found = re.findall(pattern, line)
0162         if found is not None:
0163             processid = found[1]
0164             cpu = found[2]
0165             mem = found[3]
0166             command = ' '.join(found[10:])
0167             if cmd in command:
0168                 num += 1
0169                 if processid == str(pid):
0170                     processes = [cpu, mem, command]
0171 
0172     if processes:
0173         processes.append(num)
0174 
0175     return processes
0176 
0177 
0178 def run_checks(queues, args):
0179     """
0180     Perform non-job related monitoring checks.
0181 
0182     :param queues:
0183     :param args:
0184     :return:
0185     """
0186 
0187     # check CPU consumption of pilot process and its children
0188 
0189     if args.abort_job.is_set():
0190         # find all running jobs and stop them, find all jobs in queues relevant to this module
0191         abort_jobs_in_queues(queues, args.signal)
0192 
0193         t_max = 2 * 60
0194         logger.warning('pilot monitor received instruction that abort_job has been requested')
0195         logger.warning('will wait for a maximum of %d seconds for threads to finish', t_max)
0196         t_0 = time.time()
0197         while time.time() - t_0 < t_max:
0198             if args.job_aborted.is_set():
0199                 logger.warning('job_aborted has been set - aborting pilot monitoring')
0200                 args.abort_job.clear()
0201                 return
0202             time.sleep(1)
0203 
0204         if args.graceful_stop.is_set():
0205             logger.info('graceful_stop already set')
0206         else:
0207             logger.warning('setting graceful_stop')
0208             args.graceful_stop.set()
0209 
0210         if not args.job_aborted.is_set():
0211             logger.warning('will wait for a maximum of %d seconds for graceful_stop to take effect', t_max)
0212             t_max = 10
0213             t_0 = time.time()
0214             while time.time() - t_0 < t_max:
0215                 if args.job_aborted.is_set():
0216                     logger.warning('job_aborted has been set - aborting pilot monitoring')
0217                     args.abort_job.clear()
0218                     return
0219                 time.sleep(1)
0220 
0221             diagnostics = 'reached maximum waiting time - threads should have finished'
0222             args.abort_job.clear()
0223             args.job_aborted.set()
0224             raise ExceededMaxWaitTime(diagnostics)
0225 
0226 
0227 def get_max_running_time(lifetime, queuedata):
0228     """
0229     Return the maximum allowed running time for the pilot.
0230     The max time is set either as a pilot option or via the schedconfig.maxtime for the PQ in question.
0231 
0232     :param lifetime: optional pilot option time in seconds (int).
0233     :param queuedata: queuedata object
0234     :return: max running time in seconds (int).
0235     """
0236 
0237     max_running_time = lifetime
0238 
0239     # use the schedconfig value if set, otherwise use the pilot option lifetime value
0240     if not queuedata:
0241         logger.warning('queuedata could not be extracted from queues, will use default for max running time '
0242                        '(%d s)', max_running_time)
0243     else:
0244         if queuedata.maxtime:
0245             try:
0246                 max_running_time = int(queuedata.maxtime)
0247             except Exception as error:
0248                 logger.warning('exception caught: %s', error)
0249                 logger.warning('failed to convert maxtime from queuedata, will use default value for max running time '
0250                                '(%d s)', max_running_time)
0251             else:
0252                 if max_running_time == 0:
0253                     max_running_time = lifetime  # fallback to default value
0254                     logger.info('will use default value for max running time: %d s', max_running_time)
0255                 else:
0256                     logger.info('will use queuedata.maxtime value for max running time: %d s', max_running_time)
0257 
0258     return max_running_time