Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:23

0001 #!/usr/bin/env python3
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 
0012 import os
0013 import re
0014 import sys
0015 import subprocess
0016 
0017 
0018 def get_my_username():
0019     username = os.getlogin()
0020     return username
0021 
0022 
0023 def get_queued_jobs(username, partition, debug=False):
0024     status = False
0025     num_pending_jobs = 0
0026     # command = f"squeue -u {username} --partition={partition} | grep -e PD -e CF"
0027     command = f"squeue -u {username} --partition={partition}"
0028     if debug:
0029         print(f"command: {command.split()}")
0030     p = subprocess.Popen(command.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0031     stdout, stderr = p.communicate()
0032     ret_code = p.returncode
0033 
0034     # if debug:
0035     #     print(f"returncode: {ret_code}, stdout: {stdout}, stderr: {stderr}")
0036     if ret_code == 0:
0037         stdout_str = stdout if (isinstance(stdout, str) or stdout is None) else stdout.decode()
0038         # stderr_str = stderr if (isinstance(stderr, str) or stderr is None) else stderr.decode()
0039         # if debug:
0040         #     print(f"stout: {stdout_str}, stderr: {stderr_str}")
0041 
0042         num_pending_jobs = 0
0043         for line in stdout_str.split("\n"):
0044             if len(line) == 0 or line.startswith("JobID") or line.startswith("--"):
0045                 continue
0046 
0047             batch_status = line.split()[4].strip()
0048             if batch_status in ["CF", "PD"]:
0049                 num_pending_jobs += 1
0050         if debug:
0051             print(f"number of pending jobs in partition {partition} with user {username}: {num_pending_jobs}")
0052         status = True
0053     else:
0054         if debug:
0055             print(f"returncode: {ret_code}, stdout: {stdout}, stderr: {stderr}")
0056 
0057     return status, num_pending_jobs
0058 
0059 
0060 def create_new_submit_file(old_submit_file, new_submit_file, new_partition):
0061     with open(old_submit_file, 'r') as f:
0062         content = f.read()
0063 
0064     # Replace the pattern --partition=***
0065     updated_content = re.sub(r'--partition=\S+', f'--partition={new_partition}', content)
0066 
0067     with open(new_submit_file, 'w') as f:
0068         f.write(updated_content)
0069 
0070 
0071 if __name__ == "__main__":
0072     partitions = ['milano', 'roma']
0073     all_args = sys.argv
0074     parameters = sys.argv[1:]
0075 
0076     debug = False
0077 
0078     try:
0079         username = get_my_username()
0080         num_pending_by_partition = {}
0081         for p in partitions:
0082             status, num_jobs = get_queued_jobs(username, p, debug=debug)
0083             if status:
0084                 num_pending_by_partition[p] = num_jobs
0085 
0086         if debug:
0087             print(f"num_pending_by_partition: {num_pending_by_partition}")
0088 
0089         sorted_num_pending = dict(sorted(num_pending_by_partition.items(), key=lambda item: item[1]))
0090         selected_partition = None
0091         if sorted_num_pending:
0092             selected_partition = list(sorted_num_pending.keys())[0]
0093         if debug:
0094             print(f"selected_partition: {selected_partition}")
0095 
0096         if selected_partition:
0097             submit_file = parameters[-1]
0098             new_submit_file = submit_file.strip() + ".new_local_sbatch.sh"
0099             create_new_submit_file(submit_file, new_submit_file, selected_partition)
0100             if debug:
0101                 print(f"new_submit_file: {new_submit_file}")
0102             parameters[-1] = new_submit_file
0103     except Exception as ex:
0104         if debug:
0105             print(f"Exception: {ex}")
0106 
0107     new_command = ['sbatch'] + parameters
0108     if debug:
0109         print(f"New command: {new_command}")
0110 
0111     result = subprocess.run(new_command)
0112     sys.exit(result.returncode)