Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 from pathlib import Path
0002 from typing import Set,List
0003 from datetime import datetime
0004 from logging.handlers import RotatingFileHandler
0005 import subprocess
0006 import bisect # for binary search in sorted lists
0007 
0008 from simpleLogger import slogger, CustomFormatter, CHATTY, DEBUG, INFO, WARN, ERROR, CRITICAL  # noqa: F401
0009 from sphenixdbutils import test_mode as dbutils_test_mode
0010 
0011 # ============================================================================================
0012 def shell_command(command: str) -> List[str]:
0013     """Minimal wrapper to hide away subbprocess tedium"""
0014     CHATTY(f"[shell_command] Command: {command}")
0015     ret=[]
0016     try:
0017         ret = subprocess.run(command, shell=True, check=True, capture_output=True).stdout.decode('utf-8').split()
0018     except subprocess.CalledProcessError as e:
0019         WARN("[shell_command] Command failed with exit code:", e.returncode)
0020     finally:
0021         pass
0022 
0023     CHATTY(f"[shell_command] Return value length is {len(ret)}.")
0024     return ret
0025 
0026 # ============================================================================================
0027 def setup_rot_handler(args):
0028     #################### Test mode?
0029     test_mode = (
0030         dbutils_test_mode
0031         or args.test_mode
0032     )
0033     if not args.sublogdir:
0034         if test_mode:
0035             sublogdir='/tmp/testbed/sphenixprod/'
0036         else:
0037             sublogdir='/tmp/sphenixprod/sphenixprod/'
0038     sublogdir += f"{args.rulename}".replace('.yaml','')
0039     Path(sublogdir).mkdir( parents=True, exist_ok=True )
0040     RotFileHandler = RotatingFileHandler(
0041         filename=f"{sublogdir}/{str(datetime.today().date())}.log",
0042         mode='a',
0043         maxBytes=25*1024*1024, #   maxBytes=5*1024,
0044         backupCount=10,
0045         encoding=None,
0046         delay=0
0047     )
0048     RotFileHandler.setFormatter(CustomFormatter())
0049     slogger.addHandler(RotFileHandler)
0050 
0051     return sublogdir
0052 
0053 # ============================================================================================
0054 def should_I_quit(args, myname) -> bool:
0055     # Exit without fuss if we are already running
0056     p = subprocess.Popen("ps axuww | /usr/bin/grep $USER",shell=True,stdout=subprocess.PIPE)
0057     stdout_bytes, stderr_bytes = p.communicate() # communicate() returns bytes
0058     stdout_str = stdout_bytes.decode(errors='ignore') # Decode to string
0059 
0060     # Construct a search key with script name, config file, and rulename
0061     # to check for other running instances with the same parameters.
0062     count_already_running = 0
0063     for psline in stdout_str.splitlines():
0064         if myname in psline and args.config in psline and args.rulename in psline:
0065             count_already_running += 1
0066 
0067     CHATTY ( f"Found {count_already_running} instance(s) of {myname} with config {args.config} and rulename {args.rulename} in the process list.")
0068     if count_already_running == 0:
0069         ERROR("No running instance found, including myself. That can't be right.")
0070         exit(1)
0071 
0072     if count_already_running > 1:
0073         DEBUG(f"Looks like there's already {count_already_running-1} running instance(s) of me. Suggest Stop.")
0074         return True
0075 
0076     return False
0077 
0078 # ============================================================================================
0079 def unlock_file(file_path, dryrun: bool=True):
0080     lock_name=file_path+".lock"
0081     DEBUG(f"Deleting lock file {lock_name}")
0082     if not dryrun:
0083         Path(lock_name).unlink(missing_ok=True)
0084         
0085 # ============================================================================================
0086 def lock_file(file_path, dryrun: bool=True, max_lock_age: int=4*60*60) -> bool:
0087     lock_name=file_path+".lock"
0088     if Path(lock_name).exists():
0089         WARN(f"Lock file {lock_name} already exists.")
0090         # Safety valve. If the lock is old, we assume some job didn't end gracefully and proceed anyway.
0091         mod_timestamp = Path(lock_name).stat().st_mtime
0092         mod_datetime = datetime.fromtimestamp(mod_timestamp)
0093         time_difference = datetime.now() - mod_datetime
0094         if time_difference.total_seconds() > max_lock_age:
0095             WARN(f"lock file is already {time_difference.total_seconds()} seconds old. Overriding.")
0096         else:
0097             return False
0098 
0099     if not dryrun:
0100         Path(lock_name).parent.mkdir(parents=True,exist_ok=True)
0101         Path(lock_name).touch()
0102 
0103     return True
0104 
0105 # ============================================================================================
0106 def read_batches(file_path, batch_size=1000):
0107     with open(file_path, 'r') as file:
0108         batch = []
0109         for line in file:
0110             batch.append(line.strip())
0111             if len(batch) == batch_size:
0112                 yield batch
0113                 batch = []
0114         if batch: # Yield any remaining lines in the last batch
0115             yield batch
0116 
0117 # ============================================================================================
0118 def make_chunks(lst, n):
0119     """Yield successive n-sized chunks from lst."""
0120     # source https://stackoverflow.com/questions/312443/how-do-i-split-a-list-into-equally-sized-chunks
0121     for i in range(0, len(lst), n):
0122         yield lst[i:i + n]
0123 
0124 # ============================================================================================
0125 def binary_contains_bisect(arr, x):
0126     pos = bisect.bisect_left(arr, x)
0127     if pos != len(arr) and arr[pos] == x:
0128         return True # pos
0129     return False # -1
0130 
0131 # ============================================================================================
0132 def remove_empty_directories(dirs_to_del: Set[str]):
0133     """
0134     Recursively removes all empty subdirectories within the given set of directories.
0135     If directory_path itself becomes empty after its subdirectories are processed,
0136     it will also be removed.
0137 
0138     Args:
0139         dirs_to_del (Set): The directories to process. Used to pop() and insert()
0140     """
0141     while dirs_to_del:
0142         dir = Path(dirs_to_del.pop())
0143         CHATTY(f"Called for {dir}")
0144         if not dir.is_dir():
0145             continue
0146         # In principle, don't need the not any iter call here, the directory dhoiuld be empty by definition
0147         if not any(dir.iterdir()):
0148             try:
0149                 dir.rmdir()
0150             except OSError as e:
0151                 # This might occur due to permission issues, or if the directory is
0152                 # unexpectedly not empty (e.g., hidden files not listed by iterdir
0153                 # on some specific OS/filesystem configurations, or a race condition).
0154                 print(f"Warning: Could not remove directory '{dir}'. Reason: {e}")
0155                 continue
0156         parent=dir.parent
0157         # Check the parent, if empty, add to the set
0158         if not any(parent.iterdir()):
0159             dirs_to_del.add(str(parent))