Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0009 
0010 import os
0011 import time
0012 import signal
0013 import re
0014 import threading
0015 
0016 from pilot.util.container import execute
0017 from pilot.util.auxiliary import whoami
0018 from pilot.util.filehandling import read_file, remove_dir_tree
0019 
0020 import logging
0021 logger = logging.getLogger(__name__)
0022 
0023 
0024 def find_processes_in_group(cpids, pid):
0025     """
0026     Find all processes that belong to the same group.
0027     Recursively search for the children processes belonging to pid and return their pid's.
0028     pid is the parent pid and cpids is a list that has to be initialized before calling this function and it contains
0029     the pids of the children AND the parent.
0030 
0031     :param cpids: list of pid's for all child processes to the parent pid, as well as the parent pid itself (int).
0032     :param pid: parent process id (int).
0033     :return: (updated cpids input parameter list).
0034     """
0035 
0036     if not pid:
0037         return
0038 
0039     cpids.append(pid)
0040 
0041     cmd = "ps -eo pid,ppid -m | grep %d" % pid
0042     exit_code, psout, stderr = execute(cmd, mute=True)
0043 
0044     lines = psout.split("\n")
0045     if lines != ['']:
0046         for i in range(0, len(lines)):
0047             try:
0048                 thispid = int(lines[i].split()[0])
0049                 thisppid = int(lines[i].split()[1])
0050             except Exception as error:
0051                 logger.warning('exception caught: %s', error)
0052             if thisppid == pid:
0053                 find_processes_in_group(cpids, thispid)
0054 
0055 
0056 def is_zombie(pid):
0057     """
0058     Is the given process a zombie?
0059     :param pid: process id (int).
0060     :return: boolean.
0061     """
0062 
0063     status = False
0064 
0065     cmd = "ps aux | grep %d" % (pid)
0066     exit_code, stdout, stderr = execute(cmd, mute=True)
0067     if "<defunct>" in stdout:
0068         status = True
0069 
0070     return status
0071 
0072 
0073 def get_process_commands(euid, pids):
0074     """
0075     Return a list of process commands corresponding to a pid list for user euid.
0076 
0077     :param euid: user id (int).
0078     :param pids: list of process id's.
0079     :return: list of process commands.
0080     """
0081 
0082     cmd = 'ps u -u %d' % euid
0083     process_commands = []
0084     exit_code, stdout, stderr = execute(cmd, mute=True)
0085 
0086     if exit_code != 0 or stdout == '':
0087         logger.warning('ps command failed: %d, \"%s\", \"%s\"', exit_code, stdout, stderr)
0088     else:
0089         # extract the relevant processes
0090         p_commands = stdout.split('\n')
0091         first = True
0092         for p_command in p_commands:
0093             if first:
0094                 # get the header info line
0095                 process_commands.append(p_command)
0096                 first = False
0097             else:
0098                 # remove extra spaces
0099                 _p_command = p_command
0100                 while "  " in _p_command:
0101                     _p_command = _p_command.replace("  ", " ")
0102                 items = _p_command.split(" ")
0103                 for pid in pids:
0104                     # items = username pid ...
0105                     if items[1] == str(pid):
0106                         process_commands.append(p_command)
0107                         break
0108 
0109     return process_commands
0110 
0111 
0112 def dump_stack_trace(pid):
0113     """
0114     Execute the stack trace command (pstack <pid>).
0115 
0116     :param pid: process id (int).
0117     :return:
0118     """
0119 
0120     # make sure that the process is not in a zombie state
0121     if not is_zombie(pid):
0122         cmd = "pstack %d" % (pid)
0123         exit_code, stdout, stderr = execute(cmd, mute=True, timeout=60)
0124         logger.info(stdout or "(pstack returned empty string)")
0125     else:
0126         logger.info("skipping pstack dump for zombie process")
0127 
0128 
0129 def kill_processes(pid):
0130     """
0131     Kill process beloging to given process group.
0132 
0133     :param pid: process id (int).
0134     :return:
0135     """
0136 
0137     # if there is a known subprocess pgrp, then it should be enough to kill the group in one go
0138     status = False
0139     try:
0140         pgrp = os.getpgid(pid)
0141     except Exception:
0142         pgrp = 0
0143     if pgrp != 0:
0144         status = kill_process_group(pgrp)
0145 
0146     if not status:
0147         # firstly find all the children process IDs to be killed
0148         children = []
0149         find_processes_in_group(children, pid)
0150 
0151         # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
0152         if not children:
0153             return
0154 
0155         children.reverse()
0156         logger.info("process IDs to be killed: %s (in reverse order)", str(children))
0157 
0158         # find which commands are still running
0159         try:
0160             cmds = get_process_commands(os.geteuid(), children)
0161         except Exception as error:
0162             logger.warning("get_process_commands() threw an exception: %s", error)
0163         else:
0164             if len(cmds) <= 1:
0165                 logger.warning("found no corresponding commands to process id(s)")
0166             else:
0167                 logger.info("found commands still running:")
0168                 for cmd in cmds:
0169                     logger.info(cmd)
0170 
0171                 # loop over all child processes
0172                 for i in children:
0173                     # dump the stack trace before killing it
0174                     dump_stack_trace(i)
0175 
0176                     # kill the process gracefully
0177                     kill_process(i)
0178 
0179     # kill any remaining orphan processes
0180     # note: this should no longer be necessary since ctypes has made sure all subprocesses are parented
0181     # if orphan process killing is not desired, set env var PILOT_NOKILL
0182     kill_orphans()
0183 
0184 
0185 def kill_child_processes(pid):
0186     """
0187     Kill child processes.
0188 
0189     :param pid: process id (int).
0190     :return:
0191     """
0192     # firstly find all the children process IDs to be killed
0193     children = []
0194     find_processes_in_group(children, pid)
0195 
0196     # reverse the process order so that the athena process is killed first (otherwise the stdout will be truncated)
0197     children.reverse()
0198     logger.info("process IDs to be killed: %s (in reverse order)", str(children))
0199 
0200     # find which commands are still running
0201     try:
0202         cmds = get_process_commands(os.geteuid(), children)
0203     except Exception as error:
0204         logger.warning("get_process_commands() threw an exception: %s", error)
0205     else:
0206         if len(cmds) <= 1:
0207             logger.warning("found no corresponding commands to process id(s)")
0208         else:
0209             logger.info("found commands still running:")
0210             for cmd in cmds:
0211                 logger.info(cmd)
0212 
0213             # loop over all child processes
0214             for i in children:
0215                 # dump the stack trace before killing it
0216                 dump_stack_trace(i)
0217 
0218                 # kill the process gracefully
0219                 kill_process(i)
0220 
0221 
0222 def kill_process_group(pgrp):
0223     """
0224     Kill the process group.
0225 
0226     :param pgrp: process group id (int).
0227     :return: boolean (True if SIGMTERM followed by SIGKILL signalling was successful)
0228     """
0229 
0230     status = False
0231     _sleep = True
0232 
0233     # kill the process gracefully
0234     logger.info("killing group process %d", pgrp)
0235     try:
0236         os.killpg(pgrp, signal.SIGTERM)
0237     except Exception as error:
0238         logger.warning("exception thrown when killing child group process under SIGTERM: %s", error)
0239         _sleep = False
0240     else:
0241         logger.info("SIGTERM sent to process group %d", pgrp)
0242 
0243     if _sleep:
0244         _t = 30
0245         logger.info("sleeping %d s to allow processes to exit", _t)
0246         time.sleep(_t)
0247 
0248     try:
0249         os.killpg(pgrp, signal.SIGKILL)
0250     except Exception as error:
0251         logger.warning("exception thrown when killing child group process with SIGKILL: %s", error)
0252     else:
0253         logger.info("SIGKILL sent to process group %d", pgrp)
0254         status = True
0255 
0256     return status
0257 
0258 
0259 def kill_process(pid):
0260     """
0261     Kill process.
0262 
0263     :param pid: process id (int).
0264     :return: boolean (True if successful SIGKILL)
0265     """
0266 
0267     status = False
0268 
0269     # start with soft kill (ignore any returned status)
0270     kill(pid, signal.SIGTERM)
0271 
0272     _t = 10
0273     logger.info("sleeping %d s to allow process to exit", _t)
0274     time.sleep(_t)
0275 
0276     # now do a hard kill just in case some processes haven't gone away
0277     status = kill(pid, signal.SIGKILL)
0278 
0279     return status
0280 
0281 
0282 def kill(pid, sig):
0283     """
0284     Kill the given process with the given signal.
0285 
0286     :param pid: process id (int).
0287     :param sig: signal (int).
0288     :return status: True when successful (Boolean).
0289     """
0290 
0291     status = False
0292     try:
0293         os.kill(pid, sig)
0294     except Exception as error:
0295         logger.warning("exception thrown when killing process %d with signal=%d: %s", pid, sig, error)
0296     else:
0297         logger.info("killed process %d with signal=%d", pid, sig)
0298         status = True
0299 
0300     return status
0301 
0302 
0303 # called checkProcesses() in Pilot 1, used by process monitoring
0304 def get_number_of_child_processes(pid):
0305     """
0306     Get the number of child processes for a given parent process.
0307 
0308     :param pid: parent process id (int).
0309     :return: number of child processes (int).
0310     """
0311 
0312     children = []
0313     n = 0
0314     try:
0315         find_processes_in_group(children, pid)
0316     except Exception as error:
0317         logger.warning("exception caught in find_processes_in_group: %s", error)
0318     else:
0319         if pid:
0320             n = len(children)
0321             logger.info("number of running child processes to parent process %d: %d", pid, n)
0322         else:
0323             logger.debug("pid not yet set")
0324     return n
0325 
0326 
0327 def killpg(pid, sig, args):
0328     """
0329     Kill given process group with given signal.
0330 
0331     :param pid: process group id (int).
0332     :param sig: signal (int).
0333     :return:
0334     """
0335 
0336     try:
0337         os.killpg(int(pid), sig)
0338     except Exception as error:
0339         logger.warning("failed to execute killpg(): %s", error)
0340         cmd = 'kill -%d %s' % (sig, pid)
0341         exit_code, rs, stderr = execute(cmd)
0342         if exit_code != 0:
0343             logger.warning(rs)
0344         else:
0345             logger.info("killed orphaned process %s (%s)", pid, args)
0346     else:
0347         logger.info("killed orphaned process group %s (%s)", pid, args)
0348 
0349 
0350 def get_pilot_pid_from_processes(_processes, pattern):
0351     """
0352     Identify the pilot pid from the list of processes.
0353 
0354     :param _processes: ps output (string).
0355     :param pattern: regex pattern (compiled regex string).
0356     :return: pilot pid (int or None).
0357     """
0358 
0359     pilot_pid = None
0360     for line in _processes.split('\n'):
0361         ids = pattern.search(line)
0362         if ids:
0363             pid = ids.group(1)
0364             args = ids.group(3)
0365             try:
0366                 pid = int(pid)
0367             except Exception as error:
0368                 logger.warning('failed to convert pid to int: %s', error)
0369                 continue
0370             if 'pilot.py' in args and 'python' in args:
0371                 pilot_pid = pid
0372                 break
0373 
0374     return pilot_pid
0375 
0376 
0377 def kill_orphans():
0378     """
0379     Find and kill all orphan processes belonging to current pilot user.
0380 
0381     :return:
0382     """
0383 
0384     # exception for BOINC
0385     if 'BOINC' in os.environ.get('PILOT_SITENAME', ''):
0386         logger.info("Do not look for orphan processes in BOINC jobs")
0387         return
0388 
0389     if 'PILOT_NOKILL' in os.environ:
0390         return
0391 
0392     logger.info("searching for orphan processes")
0393 
0394     cmd = "ps -o pid,ppid,args -u %s" % whoami()
0395     exit_code, _processes, stderr = execute(cmd)
0396     #pattern = re.compile(r'(\d+)\s+(\d+)\s+(\S+)')  # Python 3 (added r)
0397     pattern = re.compile(r'(\d+)\s+(\d+)\s+([\S\s]+)')  # Python 3 (added r)
0398 
0399     count = 0
0400     for line in _processes.split('\n'):
0401         ids = pattern.search(line)
0402         if ids:
0403             pid = ids.group(1)
0404             ppid = ids.group(2)
0405             args = ids.group(3)
0406             try:
0407                 pid = int(pid)
0408             except Exception as error:
0409                 logger.warning('failed to convert pid to int: %s', error)
0410                 continue
0411             if 'cvmfs2' in args:
0412                 logger.info("ignoring possible orphan process running cvmfs2: pid=%s, ppid=%s, args=\'%s\'", pid, ppid, args)
0413             elif 'pilots_starter.py' in args or 'runpilot2-wrapper.sh' in args:
0414                 logger.info("ignoring pilot launcher: pid=%s, ppid=%s, args='%s'", pid, ppid, args)
0415             elif ppid == '1':
0416                 count += 1
0417                 logger.info("found orphan process: pid=%s, ppid=%s, args='%s'", pid, ppid, args)
0418                 if 'bash' in args or ('python' in args and 'pilot.py' in args):
0419                     logger.info("will not kill bash process")
0420                 else:
0421                     killpg(pid, signal.SIGTERM, args)
0422                     _t = 10
0423                     logger.info("sleeping %d s to allow processes to exit", _t)
0424                     time.sleep(_t)
0425                     killpg(pid, signal.SIGKILL, args)
0426 
0427     if count == 0:
0428         logger.info("did not find any orphan processes")
0429     else:
0430         logger.info("found %d orphan process(es)", count)
0431 
0432 
0433 def get_max_memory_usage_from_cgroups():
0434     """
0435     Read the max_memory from CGROUPS file memory.max_usage_in_bytes.
0436 
0437     :return: max_memory (int).
0438     """
0439 
0440     max_memory = None
0441 
0442     # Get the CGroups max memory using the pilot pid
0443     pid = os.getpid()
0444     path = "/proc/%d/cgroup" % pid
0445     if os.path.exists(path):
0446         cmd = "grep memory %s" % path
0447         exit_code, out, stderr = execute(cmd)
0448         if out == "":
0449             logger.info("(command did not return anything)")
0450         else:
0451             logger.info(out)
0452             if ":memory:" in out:
0453                 pos = out.find('/')
0454                 path = out[pos:]
0455                 logger.info("extracted path = %s", path)
0456 
0457                 pre = get_cgroups_base_path()
0458                 if pre != "":
0459                     path = pre + os.path.join(path, "memory.max_usage_in_bytes")
0460                     logger.info("path to CGROUPS memory info: %s", path)
0461                     max_memory = read_file(path)
0462                 else:
0463                     logger.info("CGROUPS base path could not be extracted - not a CGROUPS site")
0464             else:
0465                 logger.warning("invalid format: %s (expected ..:memory:[path])", out)
0466     else:
0467         logger.info("path %s does not exist (not a CGROUPS site)", path)
0468 
0469     return max_memory
0470 
0471 
0472 def get_cgroups_base_path():
0473     """
0474     Return the base path for CGROUPS.
0475 
0476     :return: base path for CGROUPS (string).
0477     """
0478 
0479     cmd = "grep \'^cgroup\' /proc/mounts|grep memory| awk \'{print $2}\'"
0480     exit_code, base_path, stderr = execute(cmd, mute=True)
0481 
0482     return base_path
0483 
0484 
0485 def get_cpu_consumption_time(t0):
0486     """
0487     Return the CPU consumption time for child processes measured by system+user time from os.times().
0488     Note: the os.times() tuple is user time, system time, s user time, s system time, and elapsed real time since a
0489     fixed point in the past.
0490 
0491     :param t0: initial os.times() tuple prior to measurement.
0492     :return: system+user time for child processes (float).
0493     """
0494 
0495     t1 = os.times()
0496     user_time = t1[2] - t0[2]
0497     system_time = t1[3] - t0[3]
0498 
0499     return user_time + system_time
0500 
0501 
0502 def get_instant_cpu_consumption_time(pid):
0503     """
0504     Return the CPU consumption time (system+user time) for a given process, by parsing /prod/pid/stat.
0505     Note 1: the function returns 0.0 if the pid is not set.
0506     Note 2: the function must sum up all the user+system times for both the main process (pid) and the child
0507     processes, since the main process is most likely spawning new processes.
0508 
0509     :param pid: process id (int).
0510     :return: system+user time for a given pid (float).
0511     """
0512 
0513     utime = None
0514     stime = None
0515     cutime = None
0516     cstime = None
0517 
0518     hz = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
0519     if type(hz) != int:
0520         logger.warning('unknown SC_CLK_TCK: %s', str(hz))
0521         return 0.0
0522 
0523     if pid and hz and hz > 0:
0524         path = "/proc/%d/stat" % pid
0525         if os.path.exists(path):
0526             with open(path) as fp:
0527                 fields = fp.read().split(' ')[13:17]
0528                 utime, stime, cutime, cstime = [(float(f) / hz) for f in fields]
0529 
0530     if utime and stime and cutime and cstime:
0531         # sum up all the user+system times for both the main process (pid) and the child processes
0532         cpu_consumption_time = utime + stime + cutime + cstime
0533     else:
0534         cpu_consumption_time = 0.0
0535 
0536     return cpu_consumption_time
0537 
0538 
0539 def get_current_cpu_consumption_time(pid):
0540     """
0541     Get the current CPU consumption time (system+user time) for a given process, by looping over all child processes.
0542 
0543     :param pid: process id (int).
0544     :return: system+user time for a given pid (float).
0545     """
0546 
0547     # get all the child processes
0548     children = []
0549     find_processes_in_group(children, pid)
0550 
0551     cpuconsumptiontime = 0
0552     for _pid in children:
0553         _cpuconsumptiontime = get_instant_cpu_consumption_time(_pid)
0554         if _cpuconsumptiontime:
0555             cpuconsumptiontime += _cpuconsumptiontime
0556 
0557     return cpuconsumptiontime
0558 
0559 
0560 def is_process_running(process_id):
0561     """
0562     Check whether process is still running.
0563 
0564     :param process_id: process id (int).
0565     :return: Boolean.
0566     """
0567     try:
0568         # note that this kill function call will not kill the process
0569         os.kill(process_id, 0)
0570         return True
0571     except OSError:
0572         return False
0573 
0574 
0575 def cleanup(job, args):
0576     """
0577     Cleanup called after completion of job.
0578 
0579     :param job: job object
0580     :return:
0581     """
0582 
0583     logger.info("overall cleanup function is called")
0584 
0585     # make sure the workdir is deleted
0586     if args.cleanup:
0587         if remove_dir_tree(job.workdir):
0588             logger.info('removed %s', job.workdir)
0589 
0590         if os.path.exists(job.workdir):
0591             logger.warning('work directory still exists: %s', job.workdir)
0592         else:
0593             logger.debug('work directory was removed: %s', job.workdir)
0594     else:
0595         logger.info('workdir not removed %s', job.workdir)
0596 
0597     # collect any zombie processes
0598     job.collect_zombies(tn=10)
0599     logger.info("collected zombie processes")
0600 
0601     if job.pid:
0602         logger.info("will now attempt to kill all subprocesses of pid=%d", job.pid)
0603         kill_processes(job.pid)
0604     else:
0605         logger.warning('cannot kill any subprocesses since job.pid is not set')
0606     #logger.info("deleting job object")
0607     #del job
0608 
0609 
0610 def threads_aborted(abort_at=2):
0611     """
0612     Have the threads been aborted?
0613 
0614     :param abort_at: 1 for workflow finish, 2 for thread finish (since check is done just before thread finishes) (int).
0615     :return: Boolean.
0616     """
0617 
0618     aborted = False
0619     thread_count = threading.activeCount()
0620 
0621     # count all non-daemon threads
0622     daemon_threads = 0
0623     for thread in threading.enumerate():
0624         if thread.isDaemon():  # ignore any daemon threads, they will be aborted when python ends
0625             daemon_threads += 1
0626 
0627     if thread_count - daemon_threads == abort_at:
0628         logger.debug('aborting since the last relevant thread is about to finish')
0629         aborted = True
0630 
0631     return aborted
0632 
0633 
0634 def convert_ps_to_dict(output, pattern=r'(\d+) (\d+) (\d+) (.+)'):
0635     """
0636     Convert output from a ps command to a dictionary.
0637 
0638     Example: ps axo pid,ppid,pgid,cmd
0639       PID  PPID  PGID COMMAND
0640       22091  6672 22091 bash
0641       32581 22091 32581 ps something;sdfsdfds/athena.py ddfg
0642       -> dictionary = { 'PID': [22091, 32581], 'PPID': [22091, 6672], .. , 'COMMAND': ['ps ..', 'bash']}
0643 
0644     :param output: ps stdout (string).
0645     :param pattern: regex pattern matching the ps output (raw string).
0646     :return: dictionary.
0647     """
0648 
0649     dictionary = {}
0650     first_line = []  # e.g. PID PPID PGID COMMAND
0651 
0652     for line in output.split('\n'):
0653         try:
0654             # remove leading and trailing spaces
0655             line = line.strip()
0656             # remove multiple spaces inside the line
0657             _l = re.sub(' +', ' ', line)
0658 
0659             if first_line == []:
0660                 _l = [_f for _f in _l.split(' ') if _f]
0661                 first_line = _l
0662                 for i in range(len(_l)):
0663                     dictionary[_l[i]] = []
0664             else:  # e.g. 22091 6672 22091 bash
0665                 match = re.search(pattern, _l)
0666                 if match:
0667                     for i in range(len(first_line)):
0668                         try:
0669                             var = int(match.group(i + 1))
0670                         except Exception:
0671                             var = match.group(i + 1)
0672                         dictionary[first_line[i]].append(var)
0673 
0674         except Exception as error:
0675             print("unexpected format of utility output: %s", error)
0676 
0677     return dictionary
0678 
0679 
0680 def get_trimmed_dictionary(keys, dictionary):
0681     """
0682     Return a sub-dictionary with only the given keys.
0683 
0684     :param keys: keys to keep (list).
0685     :param dictionary: full dictionary.
0686     :return: trimmed dictionary.
0687     """
0688 
0689     subdictionary = {}
0690     for key in keys:
0691         if key in dictionary:
0692             subdictionary[key] = dictionary[key]
0693 
0694     return subdictionary
0695 
0696 
0697 def find_cmd_pids(cmd, ps_dictionary):
0698     """
0699     Find all pids for the given command.
0700     Example. cmd = 'athena.py' -> pids = [1234, 2267] (in case there are two pilots running on the WN).
0701 
0702     :param cmd: command (string).
0703     :param ps_dictionary: converted ps output (dictionary).
0704     """
0705 
0706     pids = []
0707     i = -1
0708     for _cmd in ps_dictionary.get('COMMAND'):
0709         i += 1
0710         if cmd in _cmd:
0711             pids.append(ps_dictionary.get('PID')[i])
0712     return pids
0713 
0714 
0715 def find_pid(pandaid, ps_dictionary):
0716     """
0717     Find the process id for the command that contains 'export PandaID=%d'.
0718 
0719     :param pandaid: PanDA ID (string).
0720     :param ps_dictionaryL ps output dictionary.
0721     :return: pid (int).
0722     """
0723 
0724     pid = -1
0725     i = -1
0726     pandaid_cmd = 'export PandaID=%s' % pandaid
0727     for _cmd in ps_dictionary.get('COMMAND'):
0728         i += 1
0729         if pandaid_cmd in _cmd:
0730             pid = ps_dictionary.get('PID')[i]
0731             break
0732 
0733     return pid
0734 
0735 
0736 def is_child(pid, pandaid_pid, dictionary):
0737     """
0738     Is the given pid a child process of the pandaid_pid?
0739     Proceed recursively until the parent pandaid_pid has been found, or return False if it fails to find it.
0740     """
0741 
0742     try:
0743         # where are we at in the PID list?
0744         index = dictionary.get('PID').index(pid)
0745     except ValueError:
0746         # not in the list
0747         return False
0748     else:
0749         # get the corresponding ppid
0750         ppid = dictionary.get('PPID')[index]
0751 
0752         print(index, pid, ppid, pandaid_pid)
0753         # is the current parent the same as the pandaid_pid? if yes, we are done
0754         if ppid == pandaid_pid:
0755             return True
0756         else:
0757             # try another pid
0758             return is_child(ppid, pandaid_pid, dictionary)