File indexing completed on 2026-04-20 07:58:59
0001 import datetime
0002 import subprocess
0003
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec as ws
0007
0008
0009 baseLogger = core_utils.setup_logger("titan_utils")
0010
0011
0012 class TitanUtils(PluginBase):
0013
0014 def __init__(self, **kwarg):
0015 PluginBase.__init__(self, **kwarg)
0016 tmpLog = self.make_logger(baseLogger, method_name="__init__")
0017 tmpLog.info("Titan utils initiated")
0018
0019 def get_batchjob_info(self, batchid):
0020 """
0021 Collect job info from scheduler
0022 :param batchid:
0023 :return res - dictonary with job state and some timing:
0024 """
0025 """
0026 :param batchid:
0027 :return:
0028 """
0029 tmpLog = self.make_logger(baseLogger, method_name="get_batchjob_info")
0030 res = {}
0031 tmpLog.info(f"Collect job info for batchid {batchid}")
0032 info_dict = self.get_moabjob_info(batchid)
0033 tmpLog.info(f"Got: {info_dict}")
0034 if info_dict:
0035 tmpLog.debug("Translate results")
0036 res["status"] = self.translate_status(info_dict["state"])
0037 res["nativeStatus"] = info_dict["state"]
0038 res["nativeExitCode"] = info_dict["exit_code"]
0039 res["nativeExitMsg"] = self.get_message(info_dict["exit_code"])
0040 res["start_time"] = self.fixdate(info_dict["start_time"])
0041 res["finish_time"] = self.fixdate(info_dict["finish_time"])
0042 tmpLog.info(f"Collected job info: {res}")
0043 return res
0044
0045 def get_moabjob_info(self, batchid):
0046 """
0047 Parsing of checkjob output to get job state, exit code, start time, finish time (if available)
0048 :return job_info dictonary:
0049 """
0050 tmpLog = self.make_logger(baseLogger, method_name="get_moabjob_info")
0051
0052 job_info = {"state": "", "exit_code": None, "queued_time": None, "start_time": None, "finish_time": None}
0053
0054 cmd = f"checkjob -v {batchid}"
0055 p = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0056
0057 stdOut, stdErr = p.communicate()
0058 retCode = p.returncode
0059 checkjob_str = ""
0060 if retCode == 0:
0061 checkjob_str = stdOut
0062 else:
0063 tmpLog.info(f"checkjob failed with errcode: {retCode}\nstdout:{stdOut}\nstderr:{stdErr}")
0064 return {}
0065 if checkjob_str:
0066 checkjob_out = checkjob_str.splitlines()
0067 for l in checkjob_out:
0068 if l.startswith("State: "):
0069 job_info["state"] = l[7:].split()[0]
0070 elif l.startswith("Completion Code: "):
0071 job_info["exit_code"] = int(l[17:].split()[0])
0072 if "Time: " in l:
0073 job_info["finish_time"] = l[l.index("Time: ") + 6 :]
0074 elif l.startswith("StartTime: "):
0075 job_info["start_time"] = l[11:]
0076 elif l.startswith("WallTime: "):
0077 tmpLog.info(l)
0078 tmpLog.debug(f"checkjob parsing results: {job_info}")
0079 return job_info
0080
0081 def translate_status(self, status):
0082 """
0083 MOAB status to worker status
0084 :param status:
0085 :return:
0086 """
0087 submited = ["deferred", "hold", "idle", "migrated", "staged"]
0088 running = ["starting", "running", "suspended", "canceling"]
0089 finished = ["completed"]
0090 cancelled = ["removed"]
0091 failed = ["vacated"]
0092 status = status.lower()
0093 if status in submited:
0094 return ws.ST_submitted
0095 elif status in running:
0096 return ws.ST_running
0097 elif status in finished:
0098 return ws.ST_finished
0099 elif status in cancelled:
0100 return ws.ST_cancelled
0101 elif status in failed:
0102 return ws.ST_failed
0103 else:
0104 return ws.ST_finished
0105
0106 def get_message(self, exit_code):
0107 codes_messages = {
0108 0: "No errors reported",
0109 1: "The last command the job ran was unsuccessful or job exited with error (SIGHUP)",
0110 2: "The last command the job ran was unsuccessful or job exited with error (SIGINT)",
0111 3: "The last command the job ran was unsuccessful or job exited with error (SIGQUIT)",
0112 11: "Job exited with segmentation fault (SIGSEGV)",
0113 126: "Command invoked cannot execute",
0114 127: "Command not found",
0115 128: "Invalid argument to exit",
0116 134: "Fortran floating point exception or other runtime error (SIGABRT)",
0117 137: "Job stopped by user (SIGKILL) or for exceeding a memory limit (stopped by the kernel)",
0118 139: "Invalid memory access attempt (SIGSEGV)",
0119 143: "Job stopped by user (SIGTERM)",
0120 265: "Job stopped by user (SIGKILL)",
0121 271: "Job stopped by user (SIGTERM)",
0122 -1: "Job execution failed, before files, no retry",
0123 -2: "Job execution failed, after files, no retry",
0124 -3: "Job execution failed, do retry",
0125 -4: "Job aborted on MOM initialization",
0126 -5: "Job aborted on MOM init, chkpt, no migrate",
0127 -6: "Job aborted on MOM init, chkpt, ok migrate",
0128 -7: "Job restart failed",
0129 -8: "Exec() of user command failed",
0130 -9: "Could not create/open stdout stderr files",
0131 -10: "Job exceeded a memory limit (stopped by the resource manager)",
0132 -11: "Job exceeded a walltime limit",
0133 -12: "Job exceeded a CPU time limit",
0134 -13: "Could not create the jobs control groups (cgroups)",
0135 }
0136
0137 if exit_code in codes_messages.keys():
0138 return codes_messages[exit_code]
0139 else:
0140 return "No message for this code"
0141
0142 def fixdate(self, date_str):
0143 """
0144 moab format does not have year field, this should be fixed
0145
0146 :param date_str:
0147 :return: date (datetime object)
0148 """
0149 tmpLog = self.make_logger(baseLogger, method_name="fixdate")
0150 if not date_str:
0151 return None
0152 tmpLog.debug(f"Date to fix: {date_str}")
0153 format_str = "%a %b %d %H:%M:%S %Y"
0154 date_str = " ".join([date_str, str(datetime.datetime.now().year)])
0155 date = datetime.datetime.strptime(date_str, format_str)
0156 if date > datetime.datetime.now():
0157 date_str = " ".join([date_str, str(datetime.datetime.now().year - 1)])
0158 date = datetime.datetime.strptime(date_str, format_str)
0159
0160 tmpLog.debug(f"Full date: {str(date)}")
0161 utc_offset = datetime.timedelta(0, 18000, 0)
0162 tmpLog.debug(f"UTC offset: {str(utc_offset)}")
0163 fixed_date = date + utc_offset
0164 tmpLog.debug(f"Fixed date: {str(fixed_date)}")
0165
0166 return fixed_date
0167
0168 def get_resources(self):
0169 """
0170 Fucnction to provide number of nodes with walltime limit to worker maker
0171 :return:
0172 nodes: integer
0173 walltime: intger, seconds
0174 """
0175 tmpLog = self.make_logger(baseLogger, method_name="get_backfill")
0176 tmpLog.info(f"Looking for gap more than '{self.minWalltime}' sec")
0177 nodes = 0
0178 walltime = self.minWalltime
0179
0180 backfill = self.get_backfill()
0181 if backfill:
0182 for n in sorted(backfill.keys(), reverse=True):
0183 if self.minWalltime <= backfill[n] and nodes <= n:
0184 nodes = n
0185 walltime = backfill[n] - 120
0186 break
0187
0188 if nodes < self.minNodes:
0189 nodes = 0
0190 tmpLog.info(f"Nodes: {nodes} Walltime: {walltime}")
0191
0192 return nodes, walltime
0193
0194 def get_backfill(self):
0195
0196
0197
0198 tmpLog = self.make_logger(baseLogger, method_name="get_backfill")
0199 res = {}
0200 cmd = f"showbf --blocking -p {self.partition}"
0201 p = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0202
0203 stdOut, stdErr = p.communicate()
0204 retCode = p.returncode
0205 tmpLog.info(f"retCode={retCode}")
0206 showbf_str = ""
0207 if retCode == 0:
0208 showbf_str = stdOut
0209 else:
0210 tmpLog.error(f"showbf failed with errcode: {retCode} stdout: {stdOut} stderr: {stdErr}")
0211 return res
0212
0213 tmpLog.debug(f"Available resources in {self.partition} partition\n{showbf_str}")
0214 if showbf_str:
0215 shobf_out = showbf_str.splitlines()
0216 tmpLog.info("Fitted resources")
0217 for l in shobf_out[2:]:
0218 d = l.split()
0219 nodes = int(d[2])
0220 if not d[3] == "INFINITY":
0221 walltime_arr = d[3].split(":")
0222 if len(walltime_arr) < 4:
0223 walltime_sec = int(walltime_arr[0]) * (60 * 60) + int(walltime_arr[1]) * 60 + int(walltime_arr[2])
0224 if walltime_sec > 24 * 3600:
0225 walltime_sec = 24 * 3600
0226 else:
0227 walltime_sec = 24 * 3600
0228 else:
0229 walltime_sec = 24 * 3600
0230
0231
0232
0233 if self.maxNodes:
0234 nodes = self.maxNodes if nodes > self.maxNodes else nodes
0235
0236 if nodes < 125 and walltime_sec > 2 * 3600:
0237 walltime_sec = 2 * 3600
0238 elif nodes < 312 and walltime_sec > 6 * 3600:
0239 walltime_sec = 6 * 3600
0240 elif nodes < 3749 and walltime_sec > 12 * 3600:
0241 walltime_sec = 12 * 3600
0242
0243 tmpLog.info(f"Nodes: {nodes}, Walltime (str): {d[3]}, Walltime (sec) {walltime_sec}")
0244
0245 res.update({nodes: walltime_sec})
0246
0247 return res