File indexing completed on 2026-04-25 08:29:08
0001 from pathlib import Path
0002 from typing import Set,List
0003 from datetime import datetime
0004 from logging.handlers import RotatingFileHandler
0005 import subprocess
0006 import bisect
0007
0008 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL
0009 from sphenixdbutils import test_mode as dbutils_test_mode
0010
0011
0012 def shell_command(command: str) -> List[str]:
0013 """Minimal wrapper to hide away subbprocess tedium"""
0014 CHATTY(f"[shell_command] Command: {command}")
0015 ret=[]
0016 try:
0017 ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0018 except subprocess.CalledProcessError as e:
0019 WARN("[shell_command] Command failed with exit code:", e.returncode)
0020 finally:
0021 pass
0022
0023 CHATTY(f"[shell_command] Return value length is {len(ret)}.")
0024 return ret
0025
0026
0027 def setup_rot_handler(args):
0028
0029 test_mode = (
0030 dbutils_test_mode
0031 or args.test_mode
0032 )
0033 if not args.sublogdir:
0034 if test_mode:
0035 sublogdir='/tmp/testbed/sphenixprod/'
0036 else:
0037 sublogdir='/tmp/sphenixprod/sphenixprod/'
0038 sublogdir += f"{args.rulename}".replace('.yaml','')
0039 Path(sublogdir).mkdir( parents=True, exist_ok=True )
0040 RotFileHandler = RotatingFileHandler(
0041 filename=f"{sublogdir}/{str(datetime.today().date())}.log",
0042 mode='a',
0043 maxBytes=25*1024*1024,
0044 backupCount=10,
0045 encoding=None,
0046 delay=0
0047 )
0048 RotFileHandler.setFormatter(CustomFormatter())
0049 slogger.addHandler(RotFileHandler)
0050
0051 return sublogdir
0052
0053
0054 def should_I_quit(args, myname) -> bool:
0055
0056 p = subprocess.Popen("ps axuww | /usr/bin/grep $USER",shell=True,stdout=subprocess.PIPE)
0057 stdout_bytes, stderr_bytes = p.communicate()
0058 stdout_str = stdout_bytes.decode(errors='ignore')
0059
0060
0061
0062 count_already_running = 0
0063 for psline in stdout_str.splitlines():
0064 if myname in psline and args.config in psline and args.rulename in psline:
0065 count_already_running += 1
0066
0067 CHATTY ( f"Found {count_already_running} instance(s) of {myname} with config {args.config} and rulename {args.rulename} in the process list.")
0068 if count_already_running == 0:
0069 ERROR("No running instance found, including myself. That can't be right.")
0070 exit(1)
0071
0072 if count_already_running > 1:
0073 DEBUG(f"Looks like there's already {count_already_running-1} running instance(s) of me. Suggest Stop.")
0074 return True
0075
0076 return False
0077
0078
0079 def unlock_file(file_path, dryrun: bool=True):
0080 lock_name=file_path+".lock"
0081 DEBUG(f"Deleting lock file {lock_name}")
0082 if not dryrun:
0083 Path(lock_name).unlink(missing_ok=True)
0084
0085
0086 def lock_file(file_path, dryrun: bool=True, max_lock_age: int=4*60*60) -> bool:
0087 lock_name=file_path+".lock"
0088 if Path(lock_name).exists():
0089 WARN(f"Lock file {lock_name} already exists.")
0090
0091 mod_timestamp = Path(lock_name).stat().st_mtime
0092 mod_datetime = datetime.fromtimestamp(mod_timestamp)
0093 time_difference = datetime.now() - mod_datetime
0094 if time_difference.total_seconds() > max_lock_age:
0095 WARN(f"lock file is already {time_difference.total_seconds()} seconds old. Overriding.")
0096 else:
0097 return False
0098
0099 if not dryrun:
0100 Path(lock_name).parent.mkdir(parents=True,exist_ok=True)
0101 Path(lock_name).touch()
0102
0103 return True
0104
0105
0106 def read_batches(file_path, batch_size=1000):
0107 with open(file_path, 'r') as file:
0108 batch = []
0109 for line in file:
0110 batch.append(line.strip())
0111 if len(batch) == batch_size:
0112 yield batch
0113 batch = []
0114 if batch:
0115 yield batch
0116
0117
0118 def make_chunks(lst, n):
0119 """Yield successive n-sized chunks from lst."""
0120
0121 for i in range(0, len(lst), n):
0122 yield lst[i:i + n]
0123
0124
0125 def binary_contains_bisect(arr, x):
0126 pos = bisect.bisect_left(arr, x)
0127 if pos != len(arr) and arr[pos] == x:
0128 return True
0129 return False
0130
0131
0132 def remove_empty_directories(dirs_to_del: Set[str]):
0133 """
0134 Recursively removes all empty subdirectories within the given set of directories.
0135 If directory_path itself becomes empty after its subdirectories are processed,
0136 it will also be removed.
0137
0138 Args:
0139 dirs_to_del (Set): The directories to process. Used to pop() and insert()
0140 """
0141 while dirs_to_del:
0142 dir = Path(dirs_to_del.pop())
0143 CHATTY(f"Called for {dir}")
0144 if not dir.is_dir():
0145 continue
0146
0147 if not any(dir.iterdir()):
0148 try:
0149 dir.rmdir()
0150 except OSError as e:
0151
0152
0153
0154 print(f"Warning: Could not remove directory '{dir}'. Reason: {e}")
0155 continue
0156 parent=dir.parent
0157
0158 if not any(parent.iterdir()):
0159 dirs_to_del.add(str(parent))