File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import time
0012 import signal
0013 import re
0014 import threading
0015
0016 from pilot.util.container import execute
0017 from pilot.util.auxiliary import whoami
0018 from pilot.util.filehandling import read_file, remove_dir_tree
0019
0020 import logging
0021 logger = logging.getLogger(__name__)
0022
0023
0024 def find_processes_in_group(cpids, pid):
0025 """
0026 Find all processes that belong to the same group.
0027 Recursively search for the children processes belonging to pid and return their pid's.
0028 pid is the parent pid and cpids is a list that has to be initialized before calling this function and it contains
0029 the pids of the children AND the parent.
0030
0031 :param cpids: list of pid's for all child processes to the parent pid, as well as the parent pid itself (int).
0032 :param pid: parent process id (int).
0033 :return: (updated cpids input parameter list).
0034 """
0035
0036 if not pid:
0037 return
0038
0039 cpids.append(pid)
0040
0041 cmd = "ps -eo pid,ppid -m | grep %d" % pid
0042 exit_code, psout, stderr = execute(cmd, mute=True)
0043
0044 lines = psout.split("\n")
0045 if lines != ['']:
0046 for i in range(0, len(lines)):
0047 try:
0048 thispid = int(lines[i].split()[0])
0049 thisppid = int(lines[i].split()[1])
0050 except Exception as error:
0051 logger.warning('exception caught: %s', error)
0052 if thisppid == pid:
0053 find_processes_in_group(cpids, thispid)
0054
0055
0056 def is_zombie(pid):
0057 """
0058 Is the given process a zombie?
0059 :param pid: process id (int).
0060 :return: boolean.
0061 """
0062
0063 status = False
0064
0065 cmd = "ps aux | grep %d" % (pid)
0066 exit_code, stdout, stderr = execute(cmd, mute=True)
0067 if "<defunct>" in stdout:
0068 status = True
0069
0070 return status
0071
0072
0073 def get_process_commands(euid, pids):
0074 """
0075 Return a list of process commands corresponding to a pid list for user euid.
0076
0077 :param euid: user id (int).
0078 :param pids: list of process id's.
0079 :return: list of process commands.
0080 """
0081
0082 cmd = 'ps u -u %d' % euid
0083 process_commands = []
0084 exit_code, stdout, stderr = execute(cmd, mute=True)
0085
0086 if exit_code != 0 or stdout == '':
0087 logger.warning('ps command failed: %d, \"%s\", \"%s\"', exit_code, stdout, stderr)
0088 else:
0089
0090 p_commands = stdout.split('\n')
0091 first = True
0092 for p_command in p_commands:
0093 if first:
0094
0095 process_commands.append(p_command)
0096 first = False
0097 else:
0098
0099 _p_command = p_command
0100 while " " in _p_command:
0101 _p_command = _p_command.replace(" ", " ")
0102 items = _p_command.split(" ")
0103 for pid in pids:
0104
0105 if items[1] == str(pid):
0106 process_commands.append(p_command)
0107 break
0108
0109 return process_commands
0110
0111
0112 def dump_stack_trace(pid):
0113 """
0114 Execute the stack trace command (pstack <pid>).
0115
0116 :param pid: process id (int).
0117 :return:
0118 """
0119
0120
0121 if not is_zombie(pid):
0122 cmd = "pstack %d" % (pid)
0123 exit_code, stdout, stderr = execute(cmd, mute=True, timeout=60)
0124 logger.info(stdout or "(pstack returned empty string)")
0125 else:
0126 logger.info("skipping pstack dump for zombie process")
0127
0128
0129 def kill_processes(pid):
0130 """
0131 Kill process beloging to given process group.
0132
0133 :param pid: process id (int).
0134 :return:
0135 """
0136
0137
0138 status = False
0139 try:
0140 pgrp = os.getpgid(pid)
0141 except Exception:
0142 pgrp = 0
0143 if pgrp != 0:
0144 status = kill_process_group(pgrp)
0145
0146 if not status:
0147
0148 children = []
0149 find_processes_in_group(children, pid)
0150
0151
0152 if not children:
0153 return
0154
0155 children.reverse()
0156 logger.info("process IDs to be killed: %s (in reverse order)", str(children))
0157
0158
0159 try:
0160 cmds = get_process_commands(os.geteuid(), children)
0161 except Exception as error:
0162 logger.warning("get_process_commands() threw an exception: %s", error)
0163 else:
0164 if len(cmds) <= 1:
0165 logger.warning("found no corresponding commands to process id(s)")
0166 else:
0167 logger.info("found commands still running:")
0168 for cmd in cmds:
0169 logger.info(cmd)
0170
0171
0172 for i in children:
0173
0174 dump_stack_trace(i)
0175
0176
0177 kill_process(i)
0178
0179
0180
0181
0182 kill_orphans()
0183
0184
0185 def kill_child_processes(pid):
0186 """
0187 Kill child processes.
0188
0189 :param pid: process id (int).
0190 :return:
0191 """
0192
0193 children = []
0194 find_processes_in_group(children, pid)
0195
0196
0197 children.reverse()
0198 logger.info("process IDs to be killed: %s (in reverse order)", str(children))
0199
0200
0201 try:
0202 cmds = get_process_commands(os.geteuid(), children)
0203 except Exception as error:
0204 logger.warning("get_process_commands() threw an exception: %s", error)
0205 else:
0206 if len(cmds) <= 1:
0207 logger.warning("found no corresponding commands to process id(s)")
0208 else:
0209 logger.info("found commands still running:")
0210 for cmd in cmds:
0211 logger.info(cmd)
0212
0213
0214 for i in children:
0215
0216 dump_stack_trace(i)
0217
0218
0219 kill_process(i)
0220
0221
0222 def kill_process_group(pgrp):
0223 """
0224 Kill the process group.
0225
0226 :param pgrp: process group id (int).
0227 :return: boolean (True if SIGMTERM followed by SIGKILL signalling was successful)
0228 """
0229
0230 status = False
0231 _sleep = True
0232
0233
0234 logger.info("killing group process %d", pgrp)
0235 try:
0236 os.killpg(pgrp, signal.SIGTERM)
0237 except Exception as error:
0238 logger.warning("exception thrown when killing child group process under SIGTERM: %s", error)
0239 _sleep = False
0240 else:
0241 logger.info("SIGTERM sent to process group %d", pgrp)
0242
0243 if _sleep:
0244 _t = 30
0245 logger.info("sleeping %d s to allow processes to exit", _t)
0246 time.sleep(_t)
0247
0248 try:
0249 os.killpg(pgrp, signal.SIGKILL)
0250 except Exception as error:
0251 logger.warning("exception thrown when killing child group process with SIGKILL: %s", error)
0252 else:
0253 logger.info("SIGKILL sent to process group %d", pgrp)
0254 status = True
0255
0256 return status
0257
0258
0259 def kill_process(pid):
0260 """
0261 Kill process.
0262
0263 :param pid: process id (int).
0264 :return: boolean (True if successful SIGKILL)
0265 """
0266
0267 status = False
0268
0269
0270 kill(pid, signal.SIGTERM)
0271
0272 _t = 10
0273 logger.info("sleeping %d s to allow process to exit", _t)
0274 time.sleep(_t)
0275
0276
0277 status = kill(pid, signal.SIGKILL)
0278
0279 return status
0280
0281
0282 def kill(pid, sig):
0283 """
0284 Kill the given process with the given signal.
0285
0286 :param pid: process id (int).
0287 :param sig: signal (int).
0288 :return status: True when successful (Boolean).
0289 """
0290
0291 status = False
0292 try:
0293 os.kill(pid, sig)
0294 except Exception as error:
0295 logger.warning("exception thrown when killing process %d with signal=%d: %s", pid, sig, error)
0296 else:
0297 logger.info("killed process %d with signal=%d", pid, sig)
0298 status = True
0299
0300 return status
0301
0302
0303
0304 def get_number_of_child_processes(pid):
0305 """
0306 Get the number of child processes for a given parent process.
0307
0308 :param pid: parent process id (int).
0309 :return: number of child processes (int).
0310 """
0311
0312 children = []
0313 n = 0
0314 try:
0315 find_processes_in_group(children, pid)
0316 except Exception as error:
0317 logger.warning("exception caught in find_processes_in_group: %s", error)
0318 else:
0319 if pid:
0320 n = len(children)
0321 logger.info("number of running child processes to parent process %d: %d", pid, n)
0322 else:
0323 logger.debug("pid not yet set")
0324 return n
0325
0326
0327 def killpg(pid, sig, args):
0328 """
0329 Kill given process group with given signal.
0330
0331 :param pid: process group id (int).
0332 :param sig: signal (int).
0333 :return:
0334 """
0335
0336 try:
0337 os.killpg(int(pid), sig)
0338 except Exception as error:
0339 logger.warning("failed to execute killpg(): %s", error)
0340 cmd = 'kill -%d %s' % (sig, pid)
0341 exit_code, rs, stderr = execute(cmd)
0342 if exit_code != 0:
0343 logger.warning(rs)
0344 else:
0345 logger.info("killed orphaned process %s (%s)", pid, args)
0346 else:
0347 logger.info("killed orphaned process group %s (%s)", pid, args)
0348
0349
0350 def get_pilot_pid_from_processes(_processes, pattern):
0351 """
0352 Identify the pilot pid from the list of processes.
0353
0354 :param _processes: ps output (string).
0355 :param pattern: regex pattern (compiled regex string).
0356 :return: pilot pid (int or None).
0357 """
0358
0359 pilot_pid = None
0360 for line in _processes.split('\n'):
0361 ids = pattern.search(line)
0362 if ids:
0363 pid = ids.group(1)
0364 args = ids.group(3)
0365 try:
0366 pid = int(pid)
0367 except Exception as error:
0368 logger.warning('failed to convert pid to int: %s', error)
0369 continue
0370 if 'pilot.py' in args and 'python' in args:
0371 pilot_pid = pid
0372 break
0373
0374 return pilot_pid
0375
0376
0377 def kill_orphans():
0378 """
0379 Find and kill all orphan processes belonging to current pilot user.
0380
0381 :return:
0382 """
0383
0384
0385 if 'BOINC' in os.environ.get('PILOT_SITENAME', ''):
0386 logger.info("Do not look for orphan processes in BOINC jobs")
0387 return
0388
0389 if 'PILOT_NOKILL' in os.environ:
0390 return
0391
0392 logger.info("searching for orphan processes")
0393
0394 cmd = "ps -o pid,ppid,args -u %s" % whoami()
0395 exit_code, _processes, stderr = execute(cmd)
0396
0397 pattern = re.compile(r'(\d+)\s+(\d+)\s+([\S\s]+)')
0398
0399 count = 0
0400 for line in _processes.split('\n'):
0401 ids = pattern.search(line)
0402 if ids:
0403 pid = ids.group(1)
0404 ppid = ids.group(2)
0405 args = ids.group(3)
0406 try:
0407 pid = int(pid)
0408 except Exception as error:
0409 logger.warning('failed to convert pid to int: %s', error)
0410 continue
0411 if 'cvmfs2' in args:
0412 logger.info("ignoring possible orphan process running cvmfs2: pid=%s, ppid=%s, args=\'%s\'", pid, ppid, args)
0413 elif 'pilots_starter.py' in args or 'runpilot2-wrapper.sh' in args:
0414 logger.info("ignoring pilot launcher: pid=%s, ppid=%s, args='%s'", pid, ppid, args)
0415 elif ppid == '1':
0416 count += 1
0417 logger.info("found orphan process: pid=%s, ppid=%s, args='%s'", pid, ppid, args)
0418 if 'bash' in args or ('python' in args and 'pilot.py' in args):
0419 logger.info("will not kill bash process")
0420 else:
0421 killpg(pid, signal.SIGTERM, args)
0422 _t = 10
0423 logger.info("sleeping %d s to allow processes to exit", _t)
0424 time.sleep(_t)
0425 killpg(pid, signal.SIGKILL, args)
0426
0427 if count == 0:
0428 logger.info("did not find any orphan processes")
0429 else:
0430 logger.info("found %d orphan process(es)", count)
0431
0432
0433 def get_max_memory_usage_from_cgroups():
0434 """
0435 Read the max_memory from CGROUPS file memory.max_usage_in_bytes.
0436
0437 :return: max_memory (int).
0438 """
0439
0440 max_memory = None
0441
0442
0443 pid = os.getpid()
0444 path = "/proc/%d/cgroup" % pid
0445 if os.path.exists(path):
0446 cmd = "grep memory %s" % path
0447 exit_code, out, stderr = execute(cmd)
0448 if out == "":
0449 logger.info("(command did not return anything)")
0450 else:
0451 logger.info(out)
0452 if ":memory:" in out:
0453 pos = out.find('/')
0454 path = out[pos:]
0455 logger.info("extracted path = %s", path)
0456
0457 pre = get_cgroups_base_path()
0458 if pre != "":
0459 path = pre + os.path.join(path, "memory.max_usage_in_bytes")
0460 logger.info("path to CGROUPS memory info: %s", path)
0461 max_memory = read_file(path)
0462 else:
0463 logger.info("CGROUPS base path could not be extracted - not a CGROUPS site")
0464 else:
0465 logger.warning("invalid format: %s (expected ..:memory:[path])", out)
0466 else:
0467 logger.info("path %s does not exist (not a CGROUPS site)", path)
0468
0469 return max_memory
0470
0471
0472 def get_cgroups_base_path():
0473 """
0474 Return the base path for CGROUPS.
0475
0476 :return: base path for CGROUPS (string).
0477 """
0478
0479 cmd = "grep \'^cgroup\' /proc/mounts|grep memory| awk \'{print $2}\'"
0480 exit_code, base_path, stderr = execute(cmd, mute=True)
0481
0482 return base_path
0483
0484
0485 def get_cpu_consumption_time(t0):
0486 """
0487 Return the CPU consumption time for child processes measured by system+user time from os.times().
0488 Note: the os.times() tuple is user time, system time, s user time, s system time, and elapsed real time since a
0489 fixed point in the past.
0490
0491 :param t0: initial os.times() tuple prior to measurement.
0492 :return: system+user time for child processes (float).
0493 """
0494
0495 t1 = os.times()
0496 user_time = t1[2] - t0[2]
0497 system_time = t1[3] - t0[3]
0498
0499 return user_time + system_time
0500
0501
0502 def get_instant_cpu_consumption_time(pid):
0503 """
0504 Return the CPU consumption time (system+user time) for a given process, by parsing /prod/pid/stat.
0505 Note 1: the function returns 0.0 if the pid is not set.
0506 Note 2: the function must sum up all the user+system times for both the main process (pid) and the child
0507 processes, since the main process is most likely spawning new processes.
0508
0509 :param pid: process id (int).
0510 :return: system+user time for a given pid (float).
0511 """
0512
0513 utime = None
0514 stime = None
0515 cutime = None
0516 cstime = None
0517
0518 hz = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
0519 if type(hz) != int:
0520 logger.warning('unknown SC_CLK_TCK: %s', str(hz))
0521 return 0.0
0522
0523 if pid and hz and hz > 0:
0524 path = "/proc/%d/stat" % pid
0525 if os.path.exists(path):
0526 with open(path) as fp:
0527 fields = fp.read().split(' ')[13:17]
0528 utime, stime, cutime, cstime = [(float(f) / hz) for f in fields]
0529
0530 if utime and stime and cutime and cstime:
0531
0532 cpu_consumption_time = utime + stime + cutime + cstime
0533 else:
0534 cpu_consumption_time = 0.0
0535
0536 return cpu_consumption_time
0537
0538
0539 def get_current_cpu_consumption_time(pid):
0540 """
0541 Get the current CPU consumption time (system+user time) for a given process, by looping over all child processes.
0542
0543 :param pid: process id (int).
0544 :return: system+user time for a given pid (float).
0545 """
0546
0547
0548 children = []
0549 find_processes_in_group(children, pid)
0550
0551 cpuconsumptiontime = 0
0552 for _pid in children:
0553 _cpuconsumptiontime = get_instant_cpu_consumption_time(_pid)
0554 if _cpuconsumptiontime:
0555 cpuconsumptiontime += _cpuconsumptiontime
0556
0557 return cpuconsumptiontime
0558
0559
0560 def is_process_running(process_id):
0561 """
0562 Check whether process is still running.
0563
0564 :param process_id: process id (int).
0565 :return: Boolean.
0566 """
0567 try:
0568
0569 os.kill(process_id, 0)
0570 return True
0571 except OSError:
0572 return False
0573
0574
0575 def cleanup(job, args):
0576 """
0577 Cleanup called after completion of job.
0578
0579 :param job: job object
0580 :return:
0581 """
0582
0583 logger.info("overall cleanup function is called")
0584
0585
0586 if args.cleanup:
0587 if remove_dir_tree(job.workdir):
0588 logger.info('removed %s', job.workdir)
0589
0590 if os.path.exists(job.workdir):
0591 logger.warning('work directory still exists: %s', job.workdir)
0592 else:
0593 logger.debug('work directory was removed: %s', job.workdir)
0594 else:
0595 logger.info('workdir not removed %s', job.workdir)
0596
0597
0598 job.collect_zombies(tn=10)
0599 logger.info("collected zombie processes")
0600
0601 if job.pid:
0602 logger.info("will now attempt to kill all subprocesses of pid=%d", job.pid)
0603 kill_processes(job.pid)
0604 else:
0605 logger.warning('cannot kill any subprocesses since job.pid is not set')
0606
0607
0608
0609
0610 def threads_aborted(abort_at=2):
0611 """
0612 Have the threads been aborted?
0613
0614 :param abort_at: 1 for workflow finish, 2 for thread finish (since check is done just before thread finishes) (int).
0615 :return: Boolean.
0616 """
0617
0618 aborted = False
0619 thread_count = threading.activeCount()
0620
0621
0622 daemon_threads = 0
0623 for thread in threading.enumerate():
0624 if thread.isDaemon():
0625 daemon_threads += 1
0626
0627 if thread_count - daemon_threads == abort_at:
0628 logger.debug('aborting since the last relevant thread is about to finish')
0629 aborted = True
0630
0631 return aborted
0632
0633
0634 def convert_ps_to_dict(output, pattern=r'(\d+) (\d+) (\d+) (.+)'):
0635 """
0636 Convert output from a ps command to a dictionary.
0637
0638 Example: ps axo pid,ppid,pgid,cmd
0639 PID PPID PGID COMMAND
0640 22091 6672 22091 bash
0641 32581 22091 32581 ps something;sdfsdfds/athena.py ddfg
0642 -> dictionary = { 'PID': [22091, 32581], 'PPID': [22091, 6672], .. , 'COMMAND': ['ps ..', 'bash']}
0643
0644 :param output: ps stdout (string).
0645 :param pattern: regex pattern matching the ps output (raw string).
0646 :return: dictionary.
0647 """
0648
0649 dictionary = {}
0650 first_line = []
0651
0652 for line in output.split('\n'):
0653 try:
0654
0655 line = line.strip()
0656
0657 _l = re.sub(' +', ' ', line)
0658
0659 if first_line == []:
0660 _l = [_f for _f in _l.split(' ') if _f]
0661 first_line = _l
0662 for i in range(len(_l)):
0663 dictionary[_l[i]] = []
0664 else:
0665 match = re.search(pattern, _l)
0666 if match:
0667 for i in range(len(first_line)):
0668 try:
0669 var = int(match.group(i + 1))
0670 except Exception:
0671 var = match.group(i + 1)
0672 dictionary[first_line[i]].append(var)
0673
0674 except Exception as error:
0675 print("unexpected format of utility output: %s", error)
0676
0677 return dictionary
0678
0679
0680 def get_trimmed_dictionary(keys, dictionary):
0681 """
0682 Return a sub-dictionary with only the given keys.
0683
0684 :param keys: keys to keep (list).
0685 :param dictionary: full dictionary.
0686 :return: trimmed dictionary.
0687 """
0688
0689 subdictionary = {}
0690 for key in keys:
0691 if key in dictionary:
0692 subdictionary[key] = dictionary[key]
0693
0694 return subdictionary
0695
0696
0697 def find_cmd_pids(cmd, ps_dictionary):
0698 """
0699 Find all pids for the given command.
0700 Example. cmd = 'athena.py' -> pids = [1234, 2267] (in case there are two pilots running on the WN).
0701
0702 :param cmd: command (string).
0703 :param ps_dictionary: converted ps output (dictionary).
0704 """
0705
0706 pids = []
0707 i = -1
0708 for _cmd in ps_dictionary.get('COMMAND'):
0709 i += 1
0710 if cmd in _cmd:
0711 pids.append(ps_dictionary.get('PID')[i])
0712 return pids
0713
0714
0715 def find_pid(pandaid, ps_dictionary):
0716 """
0717 Find the process id for the command that contains 'export PandaID=%d'.
0718
0719 :param pandaid: PanDA ID (string).
0720 :param ps_dictionaryL ps output dictionary.
0721 :return: pid (int).
0722 """
0723
0724 pid = -1
0725 i = -1
0726 pandaid_cmd = 'export PandaID=%s' % pandaid
0727 for _cmd in ps_dictionary.get('COMMAND'):
0728 i += 1
0729 if pandaid_cmd in _cmd:
0730 pid = ps_dictionary.get('PID')[i]
0731 break
0732
0733 return pid
0734
0735
0736 def is_child(pid, pandaid_pid, dictionary):
0737 """
0738 Is the given pid a child process of the pandaid_pid?
0739 Proceed recursively until the parent pandaid_pid has been found, or return False if it fails to find it.
0740 """
0741
0742 try:
0743
0744 index = dictionary.get('PID').index(pid)
0745 except ValueError:
0746
0747 return False
0748 else:
0749
0750 ppid = dictionary.get('PPID')[index]
0751
0752 print(index, pid, ppid, pandaid_pid)
0753
0754 if ppid == pandaid_pid:
0755 return True
0756 else:
0757
0758 return is_child(ppid, pandaid_pid, dictionary)