Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import datetime
0002 import subprocess
0003 
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec as ws
0007 
0008 # logger
0009 baseLogger = core_utils.setup_logger("titan_utils")
0010 
0011 
0012 class TitanUtils(PluginBase):
0013     # constructor
0014     def __init__(self, **kwarg):
0015         PluginBase.__init__(self, **kwarg)
0016         tmpLog = self.make_logger(baseLogger, method_name="__init__")
0017         tmpLog.info("Titan utils initiated")
0018 
0019     def get_batchjob_info(self, batchid):
0020         """
0021         Collect job info from scheduler
0022         :param batchid:
0023         :return res - dictonary with job state and some timing:
0024         """
0025         """
0026         :param batchid:
0027         :return:
0028         """
0029         tmpLog = self.make_logger(baseLogger, method_name="get_batchjob_info")
0030         res = {}
0031         tmpLog.info(f"Collect job info for batchid {batchid}")
0032         info_dict = self.get_moabjob_info(batchid)
0033         tmpLog.info(f"Got: {info_dict}")
0034         if info_dict:
0035             tmpLog.debug("Translate results")
0036             res["status"] = self.translate_status(info_dict["state"])
0037             res["nativeStatus"] = info_dict["state"]
0038             res["nativeExitCode"] = info_dict["exit_code"]
0039             res["nativeExitMsg"] = self.get_message(info_dict["exit_code"])
0040             res["start_time"] = self.fixdate(info_dict["start_time"])
0041             res["finish_time"] = self.fixdate(info_dict["finish_time"])
0042         tmpLog.info(f"Collected job info: {res}")
0043         return res
0044 
0045     def get_moabjob_info(self, batchid):
0046         """
0047         Parsing of checkjob output to get job state, exit code, start time, finish time (if available)
0048         :return job_info dictonary:
0049         """
0050         tmpLog = self.make_logger(baseLogger, method_name="get_moabjob_info")
0051 
0052         job_info = {"state": "", "exit_code": None, "queued_time": None, "start_time": None, "finish_time": None}
0053 
0054         cmd = f"checkjob -v {batchid}"
0055         p = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0056         # check return code
0057         stdOut, stdErr = p.communicate()
0058         retCode = p.returncode
0059         checkjob_str = ""
0060         if retCode == 0:
0061             checkjob_str = stdOut
0062         else:
0063             tmpLog.info(f"checkjob failed with errcode: {retCode}\nstdout:{stdOut}\nstderr:{stdErr}")
0064             return {}
0065         if checkjob_str:
0066             checkjob_out = checkjob_str.splitlines()
0067             for l in checkjob_out:
0068                 if l.startswith("State: "):
0069                     job_info["state"] = l[7:].split()[0]
0070                 elif l.startswith("Completion Code: "):
0071                     job_info["exit_code"] = int(l[17:].split()[0])
0072                     if "Time: " in l:
0073                         job_info["finish_time"] = l[l.index("Time: ") + 6 :]
0074                 elif l.startswith("StartTime: "):
0075                     job_info["start_time"] = l[11:]
0076                 elif l.startswith("WallTime: "):
0077                     tmpLog.info(l)
0078         tmpLog.debug(f"checkjob parsing results: {job_info}")
0079         return job_info
0080 
0081     def translate_status(self, status):
0082         """
0083         MOAB status to worker status
0084         :param status:
0085         :return:
0086         """
0087         submited = ["deferred", "hold", "idle", "migrated", "staged"]
0088         running = ["starting", "running", "suspended", "canceling"]
0089         finished = ["completed"]
0090         cancelled = ["removed"]
0091         failed = ["vacated"]
0092         status = status.lower()
0093         if status in submited:
0094             return ws.ST_submitted
0095         elif status in running:
0096             return ws.ST_running
0097         elif status in finished:
0098             return ws.ST_finished
0099         elif status in cancelled:
0100             return ws.ST_cancelled
0101         elif status in failed:
0102             return ws.ST_failed
0103         else:
0104             return ws.ST_finished
0105 
0106     def get_message(self, exit_code):
0107         codes_messages = {
0108             0: "No errors reported",
0109             1: "The last command the job ran was unsuccessful or job exited with error (SIGHUP)",
0110             2: "The last command the job ran was unsuccessful or job exited with error (SIGINT)",
0111             3: "The last command the job ran was unsuccessful or job exited with error (SIGQUIT)",
0112             11: "Job exited with segmentation fault (SIGSEGV)",
0113             126: "Command invoked cannot execute",
0114             127: "Command not found",
0115             128: "Invalid argument to exit",
0116             134: "Fortran floating point exception or other runtime error (SIGABRT)",
0117             137: "Job stopped by user (SIGKILL) or for exceeding a memory limit (stopped by the kernel)",
0118             139: "Invalid memory access attempt (SIGSEGV)",
0119             143: "Job stopped by user (SIGTERM)",
0120             265: "Job stopped by user (SIGKILL)",
0121             271: "Job stopped by user (SIGTERM)",
0122             -1: "Job execution failed, before files, no retry",
0123             -2: "Job execution failed, after files, no retry",
0124             -3: "Job execution failed, do retry",
0125             -4: "Job aborted on MOM initialization",
0126             -5: "Job aborted on MOM init, chkpt, no migrate",
0127             -6: "Job aborted on MOM init, chkpt, ok migrate",
0128             -7: "Job restart failed",
0129             -8: "Exec() of user command failed",
0130             -9: "Could not create/open stdout stderr files",
0131             -10: "Job exceeded a memory limit (stopped by the resource manager)",
0132             -11: "Job exceeded a walltime limit",
0133             -12: "Job exceeded a CPU time limit",
0134             -13: "Could not create the jobs control groups (cgroups)",
0135         }
0136 
0137         if exit_code in codes_messages.keys():
0138             return codes_messages[exit_code]
0139         else:
0140             return "No message for this code"
0141 
0142     def fixdate(self, date_str):
0143         """
0144         moab format does not have year field, this should be fixed
0145 
0146         :param date_str:
0147         :return: date (datetime object)
0148         """
0149         tmpLog = self.make_logger(baseLogger, method_name="fixdate")
0150         if not date_str:
0151             return None
0152         tmpLog.debug(f"Date to fix: {date_str}")
0153         format_str = "%a %b %d %H:%M:%S %Y"
0154         date_str = " ".join([date_str, str(datetime.datetime.now().year)])
0155         date = datetime.datetime.strptime(date_str, format_str)
0156         if date > datetime.datetime.now():
0157             date_str = " ".join([date_str, str(datetime.datetime.now().year - 1)])
0158         date = datetime.datetime.strptime(date_str, format_str)
0159 
0160         tmpLog.debug(f"Full date: {str(date)}")
0161         utc_offset = datetime.timedelta(0, 18000, 0)  # 5H UTC offset for Oak-Ridge
0162         tmpLog.debug(f"UTC offset: {str(utc_offset)}")
0163         fixed_date = date + utc_offset
0164         tmpLog.debug(f"Fixed date: {str(fixed_date)}")
0165 
0166         return fixed_date
0167 
0168     def get_resources(self):
0169         """
0170         Fucnction to provide number of nodes with walltime limit to worker maker
0171         :return:
0172         nodes: integer
0173         walltime: intger, seconds
0174         """
0175         tmpLog = self.make_logger(baseLogger, method_name="get_backfill")
0176         tmpLog.info(f"Looking for gap more than '{self.minWalltime}' sec")
0177         nodes = 0
0178         walltime = self.minWalltime
0179 
0180         backfill = self.get_backfill()
0181         if backfill:
0182             for n in sorted(backfill.keys(), reverse=True):
0183                 if self.minWalltime <= backfill[n] and nodes <= n:
0184                     nodes = n
0185                     walltime = backfill[n] - 120
0186                     break
0187 
0188         if nodes < self.minNodes:
0189             nodes = 0
0190         tmpLog.info(f"Nodes: {nodes} Walltime: {walltime}")
0191 
0192         return nodes, walltime
0193 
0194     def get_backfill(self):
0195         #  Function collect information about current available resources and
0196         #  return number of nodes with possible maximum value for walltime according Titan policy
0197         #
0198         tmpLog = self.make_logger(baseLogger, method_name="get_backfill")
0199         res = {}
0200         cmd = f"showbf --blocking -p {self.partition}"
0201         p = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0202         # check return code
0203         stdOut, stdErr = p.communicate()
0204         retCode = p.returncode
0205         tmpLog.info(f"retCode={retCode}")
0206         showbf_str = ""
0207         if retCode == 0:
0208             showbf_str = stdOut
0209         else:
0210             tmpLog.error(f"showbf failed with errcode: {retCode} stdout: {stdOut} stderr: {stdErr}")
0211             return res
0212 
0213         tmpLog.debug(f"Available resources in {self.partition} partition\n{showbf_str}")
0214         if showbf_str:
0215             shobf_out = showbf_str.splitlines()
0216             tmpLog.info("Fitted resources")
0217             for l in shobf_out[2:]:
0218                 d = l.split()
0219                 nodes = int(d[2])
0220                 if not d[3] == "INFINITY":
0221                     walltime_arr = d[3].split(":")
0222                     if len(walltime_arr) < 4:
0223                         walltime_sec = int(walltime_arr[0]) * (60 * 60) + int(walltime_arr[1]) * 60 + int(walltime_arr[2])
0224                         if walltime_sec > 24 * 3600:  # in case we will have more than 24H
0225                             walltime_sec = 24 * 3600
0226                     else:
0227                         walltime_sec = 24 * 3600
0228                 else:
0229                     walltime_sec = 24 * 3600  # max walltime for Titan
0230 
0231                 # Fitting Titan policy
0232                 # https://www.olcf.ornl.gov/kb_articles/titan-scheduling-policy/
0233                 if self.maxNodes:
0234                     nodes = self.maxNodes if nodes > self.maxNodes else nodes
0235 
0236                 if nodes < 125 and walltime_sec > 2 * 3600:  # less than 125 nodes, max 2H
0237                     walltime_sec = 2 * 3600
0238                 elif nodes < 312 and walltime_sec > 6 * 3600:  # between 125 and 312 nodes, max 6H
0239                     walltime_sec = 6 * 3600
0240                 elif nodes < 3749 and walltime_sec > 12 * 3600:  # between 312 and 3749 nodes, max 12H
0241                     walltime_sec = 12 * 3600
0242 
0243                 tmpLog.info(f"Nodes: {nodes}, Walltime (str): {d[3]}, Walltime (sec) {walltime_sec}")
0244 
0245                 res.update({nodes: walltime_sec})
0246 
0247         return res