Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:17

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0009 
0010 import os
0011 import re
0012 import sys
0013 
0014 from collections import Set, Mapping, deque, OrderedDict
0015 from numbers import Number
0016 from time import sleep
0017 
0018 try:
0019     zero_depth_bases = (basestring, Number, xrange, bytearray)  # Python 2
0020     iteritems = 'iteritems'
0021 except NameError:
0022     zero_depth_bases = (str, bytes, Number, range, bytearray)  # Python 3
0023     iteritems = 'items'
0024 
0025 from pilot.util.constants import (
0026     SUCCESS,
0027     FAILURE,
0028     SERVER_UPDATE_FINAL,
0029     SERVER_UPDATE_NOT_DONE,
0030     SERVER_UPDATE_TROUBLE,
0031     get_pilot_version,
0032 )
0033 
0034 from pilot.common.errorcodes import ErrorCodes
0035 from pilot.util.container import execute
0036 from pilot.util.filehandling import dump
0037 
0038 import logging
0039 logger = logging.getLogger(__name__)
0040 
0041 errors = ErrorCodes()
0042 
0043 
0044 def pilot_version_banner():
0045     """
0046     Print a pilot version banner.
0047 
0048     :return:
0049     """
0050 
0051     logger = logging.getLogger(__name__)
0052 
0053     version = '***  PanDA Pilot version %s  ***' % get_pilot_version()
0054     logger.info('*' * len(version))
0055     logger.info(version)
0056     logger.info('*' * len(version))
0057     logger.info('')
0058 
0059     if is_virtual_machine():
0060         logger.info('pilot is running in a VM')
0061 
0062     display_architecture_info()
0063     logger.info('*' * len(version))
0064 
0065 
0066 def is_virtual_machine():
0067     """
0068     Are we running in a virtual machine?
0069     If we are running inside a VM, then linux will put 'hypervisor' in cpuinfo. This function looks for the presence
0070     of that.
0071 
0072     :return: boolean.
0073     """
0074 
0075     status = False
0076 
0077     # look for 'hypervisor' in cpuinfo
0078     with open("/proc/cpuinfo", "r") as fd:
0079         lines = fd.readlines()
0080         for line in lines:
0081             if "hypervisor" in line:
0082                 status = True
0083                 break
0084 
0085     return status
0086 
0087 
0088 def display_architecture_info():
0089     """
0090     Display OS/architecture information.
0091     The function attempts to use the lsb_release -a command if available. If that is not available,
0092     it will dump the contents of
0093 
0094     :return:
0095     """
0096 
0097     logger.info("architecture information:")
0098 
0099     exit_code, stdout, stderr = execute("lsb_release -a", mute=True)
0100     if 'Command not found' in stdout or 'Command not found' in stderr:
0101         # Dump standard architecture info files if available
0102         dump("/etc/lsb-release")
0103         dump("/etc/SuSE-release")
0104         dump("/etc/redhat-release")
0105         dump("/etc/debian_version")
0106         dump("/etc/issue")
0107         dump("$MACHTYPE", cmd="echo")
0108     else:
0109         logger.info("\n%s", stdout)
0110 
0111 
0112 def get_batchsystem_jobid():
0113     """
0114     Identify and return the batch system job id (will be reported to the server)
0115 
0116     :return: batch system job id
0117     """
0118 
0119     # BQS (e.g. LYON)
0120     batchsystem_dict = {'QSUB_REQNAME': 'BQS',
0121                         'BQSCLUSTER': 'BQS',  # BQS alternative
0122                         'PBS_JOBID': 'Torque',
0123                         'LSB_JOBID': 'LSF',
0124                         'JOB_ID': 'Grid Engine',  # Sun's Grid Engine
0125                         'clusterid': 'Condor',  # Condor (variable sent through job submit file)
0126                         'SLURM_JOB_ID': 'SLURM'}
0127 
0128     try:
0129         for key, value in batchsystem_dict.iteritems():  # Python 2
0130             if key in os.environ:
0131                 return value, os.environ.get(key, '')
0132     except Exception:
0133         for key, value in list(batchsystem_dict.items()):  # Python 3
0134             if key in os.environ:
0135                 return value, os.environ.get(key, '')
0136 
0137     # Condor (get jobid from classad file)
0138     if '_CONDOR_JOB_AD' in os.environ:
0139         try:
0140             from commands import getoutput  # Python 2
0141         except Exception:
0142             from subprocess import getoutput  # Python 3
0143         return "Condor", getoutput(
0144             'sed -n "s/^GlobalJobId.*\\"\\(.*\\)\\".*/\\1/p" %s' % os.environ.get("_CONDOR_JOB_AD"))
0145 
0146     return None, ""
0147 
0148 
0149 def get_job_scheduler_id():
0150     """
0151     Get the job scheduler id from the environment variable PANDA_JSID
0152 
0153     :return: job scheduler id (string)
0154     """
0155     return os.environ.get("PANDA_JSID", "unknown")
0156 
0157 
0158 def get_pilot_id():
0159     """
0160     Get the pilot id from the environment variable GTAG
0161 
0162     :return: pilot id (string)
0163     """
0164 
0165     return os.environ.get("GTAG", "unknown")
0166 
0167 
0168 def whoami():
0169     """
0170     Return the name of the pilot user.
0171 
0172     :return: whoami output (string).
0173     """
0174 
0175     exit_code, who_am_i, stderr = execute('whoami', mute=True)
0176 
0177     return who_am_i
0178 
0179 
0180 def get_logger(job_id, log=None):
0181     """
0182     Return the logger object.
0183     Use this function to get the proper logger object. It relies on a python 2.7 function, getChild(), but if the queue
0184     is only using Python 2.6, the standard logger object will be returned instead.
0185 
0186     WARNING: it seems using this function can lead to severe memory leaks (multiple GB) in some jobs. Do not use. Keep
0187     this definition for possible later investigation.
0188 
0189     :param jod_id: PanDA job id (string).
0190     :return: logger object.
0191     """
0192 
0193     try:
0194         if log:
0195             log = log.getChild(job_id)
0196         else:
0197             log = logger.getChild(job_id)
0198     except Exception:
0199         if not log:
0200             log = logger
0201     return log
0202 
0203 
0204 def get_error_code_translation_dictionary():
0205     """
0206     Define the error code translation dictionary.
0207 
0208     :return: populated error code translation dictionary.
0209     """
0210 
0211     error_code_translation_dictionary = {
0212         -1: [64, "Site offline"],
0213         errors.GENERALERROR: [65, "General pilot error, consult batch log"],  # added to traces object
0214         errors.MKDIR: [66, "Could not create directory"],  # added to traces object
0215         errors.NOSUCHFILE: [67, "No such file or directory"],  # added to traces object
0216         errors.NOVOMSPROXY: [68, "Voms proxy not valid"],  # added to traces object
0217         errors.NOPROXY: [68, "Proxy not valid"],  # added to traces object
0218         errors.NOLOCALSPACE: [69, "No space left on local disk"],  # added to traces object
0219         errors.UNKNOWNEXCEPTION: [70, "Exception caught by pilot"],  # added to traces object
0220         errors.QUEUEDATA: [71, "Pilot could not download queuedata"],  # tested
0221         errors.QUEUEDATANOTOK: [72, "Pilot found non-valid queuedata"],  # not implemented yet, error code added
0222         errors.NOSOFTWAREDIR: [73, "Software directory does not exist"],  # added to traces object
0223         errors.JSONRETRIEVALTIMEOUT: [74, "JSON retrieval timed out"],  # ..
0224         errors.BLACKHOLE: [75, "Black hole detected in file system"],  # ..
0225         errors.MIDDLEWAREIMPORTFAILURE: [76, "Failed to import middleware module"],  # added to traces object
0226         errors.MISSINGINPUTFILE: [77, "Missing input file in SE"],  # should pilot report this type of error to wrapper?
0227         errors.PANDAQUEUENOTACTIVE: [78, "PanDA queue is not active"],
0228         errors.KILLSIGNAL: [137, "General kill signal"],  # Job terminated by unknown kill signal
0229         errors.SIGTERM: [143, "Job killed by signal: SIGTERM"],  # 128+15
0230         errors.SIGQUIT: [131, "Job killed by signal: SIGQUIT"],  # 128+3
0231         errors.SIGSEGV: [139, "Job killed by signal: SIGSEGV"],  # 128+11
0232         errors.SIGXCPU: [158, "Job killed by signal: SIGXCPU"],  # 128+30
0233         errors.SIGUSR1: [144, "Job killed by signal: SIGUSR1"],  # 128+16
0234         errors.SIGBUS: [138, "Job killed by signal: SIGBUS"]   # 128+10
0235     }
0236 
0237     return error_code_translation_dictionary
0238 
0239 
0240 def shell_exit_code(exit_code):
0241     """
0242     Translate the pilot exit code to a proper exit code for the shell (wrapper).
0243     Any error code that is to be converted by this function, should be added to the traces object like:
0244       traces.pilot['error_code'] = errors.<ERRORCODE>
0245     The traces object will be checked by the pilot module.
0246 
0247     :param exit_code: pilot error code (int).
0248     :return: standard shell exit code (int).
0249     """
0250 
0251     # Error code translation dictionary
0252     # FORMAT: { pilot_error_code : [ shell_error_code, meaning ], .. }
0253 
0254     # Restricting user (pilot) exit codes to the range 64 - 113, as suggested by http://tldp.org/LDP/abs/html/exitcodes.html
0255     # Using exit code 137 for kill signal error codes (this actually means a hard kill signal 9, (128+9), 128+2 would mean CTRL+C)
0256 
0257     error_code_translation_dictionary = get_error_code_translation_dictionary()
0258 
0259     if exit_code in error_code_translation_dictionary:
0260         return error_code_translation_dictionary.get(exit_code)[0]  # Only return the shell exit code, not the error meaning
0261     elif exit_code != 0:
0262         print("no translation to shell exit code for error code %d" % exit_code)
0263         return FAILURE
0264     else:
0265         return SUCCESS
0266 
0267 
0268 def convert_to_pilot_error_code(exit_code):
0269     """
0270     This conversion function is used to revert a batch system exit code back to a pilot error code.
0271     Note: the function is used by Harvester.
0272 
0273     :param exit_code: batch system exit code (int).
0274     :return: pilot error code (int).
0275     """
0276     error_code_translation_dictionary = get_error_code_translation_dictionary()
0277 
0278     list_of_keys = [key for (key, value) in error_code_translation_dictionary.items() if value[0] == exit_code]
0279     # note: do not use logging object as this function is used by Harvester
0280     if not list_of_keys:
0281         print('unknown exit code: %d (no matching pilot error code)' % exit_code)
0282         list_of_keys = [-1]
0283     elif len(list_of_keys) > 1:
0284         print('found multiple pilot error codes: %s' % list_of_keys)
0285 
0286     return list_of_keys[0]
0287 
0288 
0289 def get_size(obj_0):
0290     """
0291     Recursively iterate to sum size of object & members.
0292     Note: for size measurement to work, the object must have set the data members in the __init__().
0293 
0294     :param obj_0: object to be measured.
0295     :return: size in Bytes (int).
0296     """
0297 
0298     _seen_ids = set()
0299 
0300     def inner(obj):
0301         obj_id = id(obj)
0302         if obj_id in _seen_ids:
0303             return 0
0304 
0305         _seen_ids.add(obj_id)
0306         size = sys.getsizeof(obj)
0307         if isinstance(obj, zero_depth_bases):
0308             pass  # bypass remaining control flow and return
0309         elif isinstance(obj, OrderedDict):
0310             pass  # can currently not handle this
0311         elif isinstance(obj, (tuple, list, Set, deque)):
0312             size += sum(inner(i) for i in obj)
0313         elif isinstance(obj, Mapping) or hasattr(obj, iteritems):
0314             try:
0315                 size += sum(inner(k) + inner(v) for k, v in getattr(obj, iteritems)())
0316             except Exception:  # as e
0317                 pass
0318                 # <class 'collections.OrderedDict'>: unbound method iteritems() must be called
0319                 # with OrderedDict instance as first argument (got nothing instead)
0320                 #logger.debug('exception caught for obj=%s: %s', (str(obj), e))
0321 
0322         # Check for custom object instances - may subclass above too
0323         if hasattr(obj, '__dict__'):
0324             size += inner(vars(obj))
0325         if hasattr(obj, '__slots__'):  # can have __slots__ with __dict__
0326             size += sum(inner(getattr(obj, s)) for s in obj.__slots__ if hasattr(obj, s))
0327 
0328         return size
0329 
0330     return inner(obj_0)
0331 
0332 
0333 def get_pilot_state(job=None):
0334     """
0335     Return the current pilot (job) state.
0336     If the job object does not exist, the environmental variable PILOT_JOB_STATE will be queried instead.
0337 
0338     :param job:
0339     :return: pilot (job) state (string).
0340     """
0341 
0342     return job.state if job else os.environ.get('PILOT_JOB_STATE', 'unknown')
0343 
0344 
0345 def set_pilot_state(job=None, state=''):
0346     """
0347     Set the internal pilot state.
0348     Note: this function should update the global/singleton object but currently uses an environmental variable
0349     (PILOT_JOB_STATE).
0350     The function does not update job.state if it is already set to finished or failed.
0351     The environmental variable PILOT_JOB_STATE will always be set, in case the job object does not exist.
0352 
0353     :param job: optional job object.
0354     :param state: internal pilot state (string).
0355     :return:
0356     """
0357 
0358     os.environ['PILOT_JOB_STATE'] = state
0359 
0360     if job and job.state != 'failed':
0361         job.state = state
0362 
0363 
0364 def check_for_final_server_update(update_server):
0365     """
0366     Do not set graceful stop if pilot has not finished sending the final job update
0367     i.e. wait until SERVER_UPDATE is DONE_FINAL. This function sleeps for a maximum
0368     of 20*30 s until SERVER_UPDATE env variable has been set to SERVER_UPDATE_FINAL.
0369 
0370     :param update_server: args.update_server boolean.
0371     :return:
0372     """
0373 
0374     max_i = 20
0375     i = 0
0376 
0377     # abort if in startup stage or if in final update stage
0378     server_update = os.environ.get('SERVER_UPDATE', '')
0379     if server_update == SERVER_UPDATE_NOT_DONE:
0380         return
0381 
0382     while i < max_i and update_server:
0383         server_update = os.environ.get('SERVER_UPDATE', '')
0384         if server_update == SERVER_UPDATE_FINAL or server_update == SERVER_UPDATE_TROUBLE:
0385             logger.info('server update done, finishing')
0386             break
0387         logger.info('server update not finished (#%d/#%d)', i + 1, max_i)
0388         sleep(30)
0389         i += 1
0390 
0391 
0392 def is_python3():
0393     """
0394     Check if we are running on Python 3.
0395 
0396     :return: boolean.
0397     """
0398 
0399     return sys.version_info >= (3, 0)
0400 
0401 
0402 def get_resource_name():
0403     """
0404     Return the name of the resource (only set for HPC resources; e.g. Cori, otherwise return 'grid').
0405 
0406     :return: resource_name (string).
0407     """
0408 
0409     resource_name = os.environ.get('PILOT_RESOURCE_NAME', '').lower()
0410     if not resource_name:
0411         resource_name = 'grid'
0412     return resource_name
0413 
0414 
0415 def get_object_size(obj, seen=None):
0416     """
0417     Recursively find the size of any objects
0418 
0419     :param obj: object.
0420     """
0421 
0422     size = sys.getsizeof(obj)
0423     if seen is None:
0424         seen = set()
0425     obj_id = id(obj)
0426     if obj_id in seen:
0427         return 0
0428 
0429     # Important mark as seen *before* entering recursion to gracefully handle
0430     # self-referential objects
0431     seen.add(obj_id)
0432     if isinstance(obj, dict):
0433         size += sum([get_object_size(v, seen) for v in obj.values()])
0434         size += sum([get_object_size(k, seen) for k in obj.keys()])
0435     elif hasattr(obj, '__dict__'):
0436         size += get_object_size(obj.__dict__, seen)
0437     elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
0438         size += sum([get_object_size(i, seen) for i in obj])
0439 
0440     return size
0441 
0442 
0443 def show_memory_usage():
0444     """
0445     Display the current memory usage by the pilot process.
0446 
0447     :return:
0448     """
0449 
0450     _ec, _stdout, _stderr = get_memory_usage(os.getpid())
0451     try:
0452         _value = extract_memory_usage_value(_stdout)
0453     except Exception:
0454         _value = "(unknown)"
0455     logger.debug('current pilot memory usage:\n\n%s\n\nusage: %s kB\n', _stdout, _value)
0456 
0457 
0458 def get_memory_usage(pid):
0459     """
0460     Return the memory usage string (ps auxf <pid>) for the given process.
0461 
0462     :param pid: process id (int).
0463     :return: ps exit code (int), stderr (strint), stdout (string).
0464     """
0465 
0466     return execute('ps aux -q %d' % pid)
0467 
0468 
0469 def extract_memory_usage_value(output):
0470     """
0471     Extract the memory usage value from the ps output (in kB).
0472 
0473     # USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
0474     # usatlas1 13917  1.5  0.0 1324968 152832 ?      Sl   09:33   2:55 /bin/python2 ..
0475     # -> 152832 (kB)
0476 
0477     :param output: ps output (string).
0478     :return: memory value in kB (int).
0479     """
0480 
0481     memory_usage = 0
0482     for row in output.split('\n'):
0483         try:
0484             memory_usage = int(" ".join(row.split()).split(' ')[5])
0485         except Exception:
0486             pass
0487         else:
0488             break
0489 
0490     return memory_usage
0491 
0492 
0493 def cut_output(txt, cutat=1024, separator='\n[...]\n'):
0494     """
0495     Cut the given string if longer that 2*cutat value.
0496 
0497     :param txt: text to be cut at position cutat (string).
0498     :param cutat: max length of uncut text (int).
0499     :param separator: separator text (string).
0500     :return: cut text (string).
0501     """
0502 
0503     if len(txt) > 2 * cutat:
0504         txt = txt[:cutat] + separator + txt[-cutat:]
0505 
0506     return txt
0507 
0508 
0509 def has_instruction_set(instruction_set):
0510     """
0511     Determine whether a given CPU instruction set is available.
0512     The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
0513 
0514     :param instruction_set: instruction set (e.g. AVX2) (string).
0515     :return: Boolean
0516     """
0517 
0518     status = False
0519     cmd = r"grep -o \'%s[^ ]*\|%s[^ ]*\' /proc/cpuinfo" % (instruction_set.lower(), instruction_set.upper())
0520     exit_code, stdout, stderr = execute(cmd)
0521     if not exit_code and not stderr:
0522         if instruction_set.lower() in stdout.split() or instruction_set.upper() in stdout.split():
0523             status = True
0524 
0525     return status
0526 
0527 
0528 def has_instruction_sets(instruction_sets):
0529     """
0530     Determine whether a given list of CPU instruction sets is available.
0531     The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
0532     Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2"
0533     :param instruction_sets: instruction set (e.g. AVX2) (string).
0534     :return: Boolean
0535     """
0536 
0537     ret = ''
0538     r = ''
0539 
0540     for i in instruction_sets:
0541         r += r'\|%s[^ ]*\|%s[^ ]*' % (i.lower(), i.upper()) if r else r'%s[^ ]*\|%s[^ ]*' % (i.lower(), i.upper())
0542     cmd = "grep -o \'%s\' /proc/cpuinfo" % r
0543 
0544     exit_code, stdout, stderr = execute(cmd)
0545     if not exit_code and not stderr:
0546         for i in instruction_sets:
0547             if i.lower() in stdout.split() or i.upper() in stdout.split():
0548                 ret += '|%s' % i.upper() if ret else i.upper()
0549 
0550     return ret
0551 
0552 
0553 def locate_core_file(cmd=None, pid=None):
0554     """
0555     Locate the core file produced by gdb.
0556 
0557     :param cmd: optional command containing pid corresponding to core file (string).
0558     :param pid: optional pid to use with core file (core.pid) (int).
0559     :return: path to core file (string).
0560     """
0561 
0562     path = None
0563     if not pid and cmd:
0564         pid = get_pid_from_command(cmd)
0565     if pid:
0566         filename = 'core.%d' % pid
0567         path = os.path.join(os.environ.get('PILOT_HOME', '.'), filename)
0568         if os.path.exists(path):
0569             logger.debug('found core file at: %s', path)
0570 
0571         else:
0572             logger.debug('did not find %s in %s', filename, path)
0573     else:
0574         logger.warning('cannot locate core file since pid could not be extracted from command')
0575 
0576     return path
0577 
0578 
0579 def get_pid_from_command(cmd, pattern=r'gdb --pid (\d+)'):
0580     """
0581     Identify an explicit process id in the given command.
0582 
0583     Example:
0584         cmd = 'gdb --pid 19114 -ex \'generate-core-file\''
0585         -> pid = 19114
0586 
0587     :param cmd: command containing a pid (string).
0588     :param pattern: regex pattern (raw string).
0589     :return: pid (int).
0590     """
0591 
0592     pid = None
0593     match = re.search(pattern, cmd)
0594     if match:
0595         try:
0596             pid = int(match.group(1))
0597         except Exception:
0598             pid = None
0599     else:
0600         print('no match for pattern \'%s\' in command=\'%s\'' % (pattern, cmd))
0601 
0602     return pid
0603 
0604 
0605 def list_hardware():
0606     """
0607     Execute lshw to list local hardware.
0608 
0609     :return: lshw output (string).
0610     """
0611 
0612     exit_code, stdout, stderr = execute('lshw -numeric -C display', mute=True)
0613     if 'Command not found' in stdout or 'Command not found' in stderr:
0614         stdout = ''
0615     return stdout
0616 
0617 
0618 def get_display_info():
0619     """
0620     Extract the product and vendor from the lshw command.
0621     E.g.
0622            product: GD 5446 [1013:B8]
0623            vendor: Cirrus Logic [1013]
0624     -> GD 5446, Cirrus Logic
0625 
0626     :return: product (string), vendor (string).
0627     """
0628 
0629     vendor = ''
0630     product = ''
0631     stdout = list_hardware()
0632     if stdout:
0633         vendor_pattern = re.compile(r'vendor\:\ (.+)\ .')
0634         product_pattern = re.compile(r'product\:\ (.+)\ .')
0635 
0636         for line in stdout.split('\n'):
0637             if 'vendor' in line:
0638                 result = re.findall(vendor_pattern, line)
0639                 if result:
0640                     vendor = result[0]
0641             elif 'product' in line:
0642                 result = re.findall(product_pattern, line)
0643                 if result:
0644                     product = result[0]
0645 
0646     return product, vendor
0647 
0648 
0649 def get_key_value(catchall, key='SOMEKEY'):
0650     """
0651     Return the value corresponding to key in catchall.
0652     :param catchall: catchall free string.
0653     :param key: key name (string).
0654     :return: value (string).
0655     """
0656 
0657     # ignore any non-key-value pairs that might be present in the catchall string
0658     _dic = dict(_str.split('=', 1) for _str in catchall.split() if '=' in _str)
0659 
0660     return _dic.get(key)
0661 
0662 
0663 def is_string(obj):
0664     """
0665     Determine if the passed object is a string or not.
0666 
0667     :param obj: object (object type).
0668     :return: True if obj is a string (Boolean).
0669     """
0670 
0671     value = False
0672     try:
0673         if isinstance(obj, basestring):  # Python 2 # noqa: F821
0674             value = True
0675     except NameError:
0676         if isinstance(obj, str):  # Python 3
0677             value = True
0678 
0679     return value