File indexing completed on 2026-04-10 08:39:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import re
0012 import sys
0013
0014 from collections import Set, Mapping, deque, OrderedDict
0015 from numbers import Number
0016 from time import sleep
0017
0018 try:
0019 zero_depth_bases = (basestring, Number, xrange, bytearray)
0020 iteritems = 'iteritems'
0021 except NameError:
0022 zero_depth_bases = (str, bytes, Number, range, bytearray)
0023 iteritems = 'items'
0024
0025 from pilot.util.constants import (
0026 SUCCESS,
0027 FAILURE,
0028 SERVER_UPDATE_FINAL,
0029 SERVER_UPDATE_NOT_DONE,
0030 SERVER_UPDATE_TROUBLE,
0031 get_pilot_version,
0032 )
0033
0034 from pilot.common.errorcodes import ErrorCodes
0035 from pilot.util.container import execute
0036 from pilot.util.filehandling import dump
0037
0038 import logging
0039 logger = logging.getLogger(__name__)
0040
0041 errors = ErrorCodes()
0042
0043
0044 def pilot_version_banner():
0045 """
0046 Print a pilot version banner.
0047
0048 :return:
0049 """
0050
0051 logger = logging.getLogger(__name__)
0052
0053 version = '*** PanDA Pilot version %s ***' % get_pilot_version()
0054 logger.info('*' * len(version))
0055 logger.info(version)
0056 logger.info('*' * len(version))
0057 logger.info('')
0058
0059 if is_virtual_machine():
0060 logger.info('pilot is running in a VM')
0061
0062 display_architecture_info()
0063 logger.info('*' * len(version))
0064
0065
0066 def is_virtual_machine():
0067 """
0068 Are we running in a virtual machine?
0069 If we are running inside a VM, then linux will put 'hypervisor' in cpuinfo. This function looks for the presence
0070 of that.
0071
0072 :return: boolean.
0073 """
0074
0075 status = False
0076
0077
0078 with open("/proc/cpuinfo", "r") as fd:
0079 lines = fd.readlines()
0080 for line in lines:
0081 if "hypervisor" in line:
0082 status = True
0083 break
0084
0085 return status
0086
0087
0088 def display_architecture_info():
0089 """
0090 Display OS/architecture information.
0091 The function attempts to use the lsb_release -a command if available. If that is not available,
0092 it will dump the contents of
0093
0094 :return:
0095 """
0096
0097 logger.info("architecture information:")
0098
0099 exit_code, stdout, stderr = execute("lsb_release -a", mute=True)
0100 if 'Command not found' in stdout or 'Command not found' in stderr:
0101
0102 dump("/etc/lsb-release")
0103 dump("/etc/SuSE-release")
0104 dump("/etc/redhat-release")
0105 dump("/etc/debian_version")
0106 dump("/etc/issue")
0107 dump("$MACHTYPE", cmd="echo")
0108 else:
0109 logger.info("\n%s", stdout)
0110
0111
0112 def get_batchsystem_jobid():
0113 """
0114 Identify and return the batch system job id (will be reported to the server)
0115
0116 :return: batch system job id
0117 """
0118
0119
0120 batchsystem_dict = {'QSUB_REQNAME': 'BQS',
0121 'BQSCLUSTER': 'BQS',
0122 'PBS_JOBID': 'Torque',
0123 'LSB_JOBID': 'LSF',
0124 'JOB_ID': 'Grid Engine',
0125 'clusterid': 'Condor',
0126 'SLURM_JOB_ID': 'SLURM'}
0127
0128 try:
0129 for key, value in batchsystem_dict.iteritems():
0130 if key in os.environ:
0131 return value, os.environ.get(key, '')
0132 except Exception:
0133 for key, value in list(batchsystem_dict.items()):
0134 if key in os.environ:
0135 return value, os.environ.get(key, '')
0136
0137
0138 if '_CONDOR_JOB_AD' in os.environ:
0139 try:
0140 from commands import getoutput
0141 except Exception:
0142 from subprocess import getoutput
0143 return "Condor", getoutput(
0144 'sed -n "s/^GlobalJobId.*\\"\\(.*\\)\\".*/\\1/p" %s' % os.environ.get("_CONDOR_JOB_AD"))
0145
0146 return None, ""
0147
0148
0149 def get_job_scheduler_id():
0150 """
0151 Get the job scheduler id from the environment variable PANDA_JSID
0152
0153 :return: job scheduler id (string)
0154 """
0155 return os.environ.get("PANDA_JSID", "unknown")
0156
0157
0158 def get_pilot_id():
0159 """
0160 Get the pilot id from the environment variable GTAG
0161
0162 :return: pilot id (string)
0163 """
0164
0165 return os.environ.get("GTAG", "unknown")
0166
0167
0168 def whoami():
0169 """
0170 Return the name of the pilot user.
0171
0172 :return: whoami output (string).
0173 """
0174
0175 exit_code, who_am_i, stderr = execute('whoami', mute=True)
0176
0177 return who_am_i
0178
0179
0180 def get_logger(job_id, log=None):
0181 """
0182 Return the logger object.
0183 Use this function to get the proper logger object. It relies on a python 2.7 function, getChild(), but if the queue
0184 is only using Python 2.6, the standard logger object will be returned instead.
0185
0186 WARNING: it seems using this function can lead to severe memory leaks (multiple GB) in some jobs. Do not use. Keep
0187 this definition for possible later investigation.
0188
0189 :param jod_id: PanDA job id (string).
0190 :return: logger object.
0191 """
0192
0193 try:
0194 if log:
0195 log = log.getChild(job_id)
0196 else:
0197 log = logger.getChild(job_id)
0198 except Exception:
0199 if not log:
0200 log = logger
0201 return log
0202
0203
0204 def get_error_code_translation_dictionary():
0205 """
0206 Define the error code translation dictionary.
0207
0208 :return: populated error code translation dictionary.
0209 """
0210
0211 error_code_translation_dictionary = {
0212 -1: [64, "Site offline"],
0213 errors.GENERALERROR: [65, "General pilot error, consult batch log"],
0214 errors.MKDIR: [66, "Could not create directory"],
0215 errors.NOSUCHFILE: [67, "No such file or directory"],
0216 errors.NOVOMSPROXY: [68, "Voms proxy not valid"],
0217 errors.NOPROXY: [68, "Proxy not valid"],
0218 errors.NOLOCALSPACE: [69, "No space left on local disk"],
0219 errors.UNKNOWNEXCEPTION: [70, "Exception caught by pilot"],
0220 errors.QUEUEDATA: [71, "Pilot could not download queuedata"],
0221 errors.QUEUEDATANOTOK: [72, "Pilot found non-valid queuedata"],
0222 errors.NOSOFTWAREDIR: [73, "Software directory does not exist"],
0223 errors.JSONRETRIEVALTIMEOUT: [74, "JSON retrieval timed out"],
0224 errors.BLACKHOLE: [75, "Black hole detected in file system"],
0225 errors.MIDDLEWAREIMPORTFAILURE: [76, "Failed to import middleware module"],
0226 errors.MISSINGINPUTFILE: [77, "Missing input file in SE"],
0227 errors.PANDAQUEUENOTACTIVE: [78, "PanDA queue is not active"],
0228 errors.KILLSIGNAL: [137, "General kill signal"],
0229 errors.SIGTERM: [143, "Job killed by signal: SIGTERM"],
0230 errors.SIGQUIT: [131, "Job killed by signal: SIGQUIT"],
0231 errors.SIGSEGV: [139, "Job killed by signal: SIGSEGV"],
0232 errors.SIGXCPU: [158, "Job killed by signal: SIGXCPU"],
0233 errors.SIGUSR1: [144, "Job killed by signal: SIGUSR1"],
0234 errors.SIGBUS: [138, "Job killed by signal: SIGBUS"]
0235 }
0236
0237 return error_code_translation_dictionary
0238
0239
0240 def shell_exit_code(exit_code):
0241 """
0242 Translate the pilot exit code to a proper exit code for the shell (wrapper).
0243 Any error code that is to be converted by this function, should be added to the traces object like:
0244 traces.pilot['error_code'] = errors.<ERRORCODE>
0245 The traces object will be checked by the pilot module.
0246
0247 :param exit_code: pilot error code (int).
0248 :return: standard shell exit code (int).
0249 """
0250
0251
0252
0253
0254
0255
0256
0257 error_code_translation_dictionary = get_error_code_translation_dictionary()
0258
0259 if exit_code in error_code_translation_dictionary:
0260 return error_code_translation_dictionary.get(exit_code)[0]
0261 elif exit_code != 0:
0262 print("no translation to shell exit code for error code %d" % exit_code)
0263 return FAILURE
0264 else:
0265 return SUCCESS
0266
0267
0268 def convert_to_pilot_error_code(exit_code):
0269 """
0270 This conversion function is used to revert a batch system exit code back to a pilot error code.
0271 Note: the function is used by Harvester.
0272
0273 :param exit_code: batch system exit code (int).
0274 :return: pilot error code (int).
0275 """
0276 error_code_translation_dictionary = get_error_code_translation_dictionary()
0277
0278 list_of_keys = [key for (key, value) in error_code_translation_dictionary.items() if value[0] == exit_code]
0279
0280 if not list_of_keys:
0281 print('unknown exit code: %d (no matching pilot error code)' % exit_code)
0282 list_of_keys = [-1]
0283 elif len(list_of_keys) > 1:
0284 print('found multiple pilot error codes: %s' % list_of_keys)
0285
0286 return list_of_keys[0]
0287
0288
0289 def get_size(obj_0):
0290 """
0291 Recursively iterate to sum size of object & members.
0292 Note: for size measurement to work, the object must have set the data members in the __init__().
0293
0294 :param obj_0: object to be measured.
0295 :return: size in Bytes (int).
0296 """
0297
0298 _seen_ids = set()
0299
0300 def inner(obj):
0301 obj_id = id(obj)
0302 if obj_id in _seen_ids:
0303 return 0
0304
0305 _seen_ids.add(obj_id)
0306 size = sys.getsizeof(obj)
0307 if isinstance(obj, zero_depth_bases):
0308 pass
0309 elif isinstance(obj, OrderedDict):
0310 pass
0311 elif isinstance(obj, (tuple, list, Set, deque)):
0312 size += sum(inner(i) for i in obj)
0313 elif isinstance(obj, Mapping) or hasattr(obj, iteritems):
0314 try:
0315 size += sum(inner(k) + inner(v) for k, v in getattr(obj, iteritems)())
0316 except Exception:
0317 pass
0318
0319
0320
0321
0322
0323 if hasattr(obj, '__dict__'):
0324 size += inner(vars(obj))
0325 if hasattr(obj, '__slots__'):
0326 size += sum(inner(getattr(obj, s)) for s in obj.__slots__ if hasattr(obj, s))
0327
0328 return size
0329
0330 return inner(obj_0)
0331
0332
0333 def get_pilot_state(job=None):
0334 """
0335 Return the current pilot (job) state.
0336 If the job object does not exist, the environmental variable PILOT_JOB_STATE will be queried instead.
0337
0338 :param job:
0339 :return: pilot (job) state (string).
0340 """
0341
0342 return job.state if job else os.environ.get('PILOT_JOB_STATE', 'unknown')
0343
0344
0345 def set_pilot_state(job=None, state=''):
0346 """
0347 Set the internal pilot state.
0348 Note: this function should update the global/singleton object but currently uses an environmental variable
0349 (PILOT_JOB_STATE).
0350 The function does not update job.state if it is already set to finished or failed.
0351 The environmental variable PILOT_JOB_STATE will always be set, in case the job object does not exist.
0352
0353 :param job: optional job object.
0354 :param state: internal pilot state (string).
0355 :return:
0356 """
0357
0358 os.environ['PILOT_JOB_STATE'] = state
0359
0360 if job and job.state != 'failed':
0361 job.state = state
0362
0363
0364 def check_for_final_server_update(update_server):
0365 """
0366 Do not set graceful stop if pilot has not finished sending the final job update
0367 i.e. wait until SERVER_UPDATE is DONE_FINAL. This function sleeps for a maximum
0368 of 20*30 s until SERVER_UPDATE env variable has been set to SERVER_UPDATE_FINAL.
0369
0370 :param update_server: args.update_server boolean.
0371 :return:
0372 """
0373
0374 max_i = 20
0375 i = 0
0376
0377
0378 server_update = os.environ.get('SERVER_UPDATE', '')
0379 if server_update == SERVER_UPDATE_NOT_DONE:
0380 return
0381
0382 while i < max_i and update_server:
0383 server_update = os.environ.get('SERVER_UPDATE', '')
0384 if server_update == SERVER_UPDATE_FINAL or server_update == SERVER_UPDATE_TROUBLE:
0385 logger.info('server update done, finishing')
0386 break
0387 logger.info('server update not finished (#%d/#%d)', i + 1, max_i)
0388 sleep(30)
0389 i += 1
0390
0391
0392 def is_python3():
0393 """
0394 Check if we are running on Python 3.
0395
0396 :return: boolean.
0397 """
0398
0399 return sys.version_info >= (3, 0)
0400
0401
0402 def get_resource_name():
0403 """
0404 Return the name of the resource (only set for HPC resources; e.g. Cori, otherwise return 'grid').
0405
0406 :return: resource_name (string).
0407 """
0408
0409 resource_name = os.environ.get('PILOT_RESOURCE_NAME', '').lower()
0410 if not resource_name:
0411 resource_name = 'grid'
0412 return resource_name
0413
0414
0415 def get_object_size(obj, seen=None):
0416 """
0417 Recursively find the size of any objects
0418
0419 :param obj: object.
0420 """
0421
0422 size = sys.getsizeof(obj)
0423 if seen is None:
0424 seen = set()
0425 obj_id = id(obj)
0426 if obj_id in seen:
0427 return 0
0428
0429
0430
0431 seen.add(obj_id)
0432 if isinstance(obj, dict):
0433 size += sum([get_object_size(v, seen) for v in obj.values()])
0434 size += sum([get_object_size(k, seen) for k in obj.keys()])
0435 elif hasattr(obj, '__dict__'):
0436 size += get_object_size(obj.__dict__, seen)
0437 elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
0438 size += sum([get_object_size(i, seen) for i in obj])
0439
0440 return size
0441
0442
0443 def show_memory_usage():
0444 """
0445 Display the current memory usage by the pilot process.
0446
0447 :return:
0448 """
0449
0450 _ec, _stdout, _stderr = get_memory_usage(os.getpid())
0451 try:
0452 _value = extract_memory_usage_value(_stdout)
0453 except Exception:
0454 _value = "(unknown)"
0455 logger.debug('current pilot memory usage:\n\n%s\n\nusage: %s kB\n', _stdout, _value)
0456
0457
0458 def get_memory_usage(pid):
0459 """
0460 Return the memory usage string (ps auxf <pid>) for the given process.
0461
0462 :param pid: process id (int).
0463 :return: ps exit code (int), stderr (strint), stdout (string).
0464 """
0465
0466 return execute('ps aux -q %d' % pid)
0467
0468
0469 def extract_memory_usage_value(output):
0470 """
0471 Extract the memory usage value from the ps output (in kB).
0472
0473 # USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
0474 # usatlas1 13917 1.5 0.0 1324968 152832 ? Sl 09:33 2:55 /bin/python2 ..
0475 # -> 152832 (kB)
0476
0477 :param output: ps output (string).
0478 :return: memory value in kB (int).
0479 """
0480
0481 memory_usage = 0
0482 for row in output.split('\n'):
0483 try:
0484 memory_usage = int(" ".join(row.split()).split(' ')[5])
0485 except Exception:
0486 pass
0487 else:
0488 break
0489
0490 return memory_usage
0491
0492
0493 def cut_output(txt, cutat=1024, separator='\n[...]\n'):
0494 """
0495 Cut the given string if longer that 2*cutat value.
0496
0497 :param txt: text to be cut at position cutat (string).
0498 :param cutat: max length of uncut text (int).
0499 :param separator: separator text (string).
0500 :return: cut text (string).
0501 """
0502
0503 if len(txt) > 2 * cutat:
0504 txt = txt[:cutat] + separator + txt[-cutat:]
0505
0506 return txt
0507
0508
0509 def has_instruction_set(instruction_set):
0510 """
0511 Determine whether a given CPU instruction set is available.
0512 The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
0513
0514 :param instruction_set: instruction set (e.g. AVX2) (string).
0515 :return: Boolean
0516 """
0517
0518 status = False
0519 cmd = r"grep -o \'%s[^ ]*\|%s[^ ]*\' /proc/cpuinfo" % (instruction_set.lower(), instruction_set.upper())
0520 exit_code, stdout, stderr = execute(cmd)
0521 if not exit_code and not stderr:
0522 if instruction_set.lower() in stdout.split() or instruction_set.upper() in stdout.split():
0523 status = True
0524
0525 return status
0526
0527
0528 def has_instruction_sets(instruction_sets):
0529 """
0530 Determine whether a given list of CPU instruction sets is available.
0531 The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
0532 Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2"
0533 :param instruction_sets: instruction set (e.g. AVX2) (string).
0534 :return: Boolean
0535 """
0536
0537 ret = ''
0538 r = ''
0539
0540 for i in instruction_sets:
0541 r += r'\|%s[^ ]*\|%s[^ ]*' % (i.lower(), i.upper()) if r else r'%s[^ ]*\|%s[^ ]*' % (i.lower(), i.upper())
0542 cmd = "grep -o \'%s\' /proc/cpuinfo" % r
0543
0544 exit_code, stdout, stderr = execute(cmd)
0545 if not exit_code and not stderr:
0546 for i in instruction_sets:
0547 if i.lower() in stdout.split() or i.upper() in stdout.split():
0548 ret += '|%s' % i.upper() if ret else i.upper()
0549
0550 return ret
0551
0552
0553 def locate_core_file(cmd=None, pid=None):
0554 """
0555 Locate the core file produced by gdb.
0556
0557 :param cmd: optional command containing pid corresponding to core file (string).
0558 :param pid: optional pid to use with core file (core.pid) (int).
0559 :return: path to core file (string).
0560 """
0561
0562 path = None
0563 if not pid and cmd:
0564 pid = get_pid_from_command(cmd)
0565 if pid:
0566 filename = 'core.%d' % pid
0567 path = os.path.join(os.environ.get('PILOT_HOME', '.'), filename)
0568 if os.path.exists(path):
0569 logger.debug('found core file at: %s', path)
0570
0571 else:
0572 logger.debug('did not find %s in %s', filename, path)
0573 else:
0574 logger.warning('cannot locate core file since pid could not be extracted from command')
0575
0576 return path
0577
0578
0579 def get_pid_from_command(cmd, pattern=r'gdb --pid (\d+)'):
0580 """
0581 Identify an explicit process id in the given command.
0582
0583 Example:
0584 cmd = 'gdb --pid 19114 -ex \'generate-core-file\''
0585 -> pid = 19114
0586
0587 :param cmd: command containing a pid (string).
0588 :param pattern: regex pattern (raw string).
0589 :return: pid (int).
0590 """
0591
0592 pid = None
0593 match = re.search(pattern, cmd)
0594 if match:
0595 try:
0596 pid = int(match.group(1))
0597 except Exception:
0598 pid = None
0599 else:
0600 print('no match for pattern \'%s\' in command=\'%s\'' % (pattern, cmd))
0601
0602 return pid
0603
0604
0605 def list_hardware():
0606 """
0607 Execute lshw to list local hardware.
0608
0609 :return: lshw output (string).
0610 """
0611
0612 exit_code, stdout, stderr = execute('lshw -numeric -C display', mute=True)
0613 if 'Command not found' in stdout or 'Command not found' in stderr:
0614 stdout = ''
0615 return stdout
0616
0617
0618 def get_display_info():
0619 """
0620 Extract the product and vendor from the lshw command.
0621 E.g.
0622 product: GD 5446 [1013:B8]
0623 vendor: Cirrus Logic [1013]
0624 -> GD 5446, Cirrus Logic
0625
0626 :return: product (string), vendor (string).
0627 """
0628
0629 vendor = ''
0630 product = ''
0631 stdout = list_hardware()
0632 if stdout:
0633 vendor_pattern = re.compile(r'vendor\:\ (.+)\ .')
0634 product_pattern = re.compile(r'product\:\ (.+)\ .')
0635
0636 for line in stdout.split('\n'):
0637 if 'vendor' in line:
0638 result = re.findall(vendor_pattern, line)
0639 if result:
0640 vendor = result[0]
0641 elif 'product' in line:
0642 result = re.findall(product_pattern, line)
0643 if result:
0644 product = result[0]
0645
0646 return product, vendor
0647
0648
0649 def get_key_value(catchall, key='SOMEKEY'):
0650 """
0651 Return the value corresponding to key in catchall.
0652 :param catchall: catchall free string.
0653 :param key: key name (string).
0654 :return: value (string).
0655 """
0656
0657
0658 _dic = dict(_str.split('=', 1) for _str in catchall.split() if '=' in _str)
0659
0660 return _dic.get(key)
0661
0662
0663 def is_string(obj):
0664 """
0665 Determine if the passed object is a string or not.
0666
0667 :param obj: object (object type).
0668 :return: True if obj is a string (Boolean).
0669 """
0670
0671 value = False
0672 try:
0673 if isinstance(obj, basestring):
0674 value = True
0675 except NameError:
0676 if isinstance(obj, str):
0677 value = True
0678
0679 return value