File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import os
0012 import traceback
0013
0014 import configparser as ConfigParser
0015
0016 from idds.common.constants import ProcessingStatus
0017
0018
0019 class PandaClient(object):
0020
0021 def __init__(self, *args, **kwargs):
0022 super(PandaClient, self).__init__()
0023 self.load_panda_urls()
0024
0025 def load_panda_config(self):
0026 panda_config = ConfigParser.ConfigParser()
0027 if os.environ.get("IDDS_PANDA_CONFIG", None):
0028 configfile = os.environ["IDDS_PANDA_CONFIG"]
0029 if panda_config.read(configfile) == [configfile]:
0030 return panda_config
0031
0032 configfiles = [
0033 "%s/etc/panda/panda.cfg" % os.environ.get("IDDS_HOME", ""),
0034 "/etc/panda/panda.cfg",
0035 "/opt/idds/etc/panda/panda.cfg",
0036 "%s/etc/panda/panda.cfg" % os.environ.get("VIRTUAL_ENV", ""),
0037 ]
0038 for configfile in configfiles:
0039 if panda_config.read(configfile) == [configfile]:
0040 return panda_config
0041 return panda_config
0042
0043 def load_panda_urls(self):
0044 panda_config = self.load_panda_config()
0045
0046 self.panda_url = None
0047 self.panda_url_ssl = None
0048 self.panda_monitor = None
0049 self.panda_auth = None
0050 self.panda_auth_vo = None
0051 self.panda_config_root = None
0052 self.pandacache_url = None
0053 self.panda_verify_host = None
0054
0055 if panda_config.has_section("panda"):
0056 if "PANDA_MONITOR_URL" not in os.environ and panda_config.has_option(
0057 "panda", "panda_monitor_url"
0058 ):
0059 self.panda_monitor = panda_config.get("panda", "panda_monitor_url")
0060 os.environ["PANDA_MONITOR_URL"] = self.panda_monitor
0061
0062 if "PANDA_URL" not in os.environ and panda_config.has_option(
0063 "panda", "panda_url"
0064 ):
0065 self.panda_url = panda_config.get("panda", "panda_url")
0066 os.environ["PANDA_URL"] = self.panda_url
0067
0068 if "PANDACACHE_URL" not in os.environ and panda_config.has_option(
0069 "panda", "pandacache_url"
0070 ):
0071 self.pandacache_url = panda_config.get("panda", "pandacache_url")
0072 os.environ["PANDACACHE_URL"] = self.pandacache_url
0073
0074 if "PANDA_VERIFY_HOST" not in os.environ and panda_config.has_option(
0075 "panda", "panda_verify_host"
0076 ):
0077 self.panda_verify_host = panda_config.get("panda", "panda_verify_host")
0078 os.environ["PANDA_VERIFY_HOST"] = self.panda_verify_host
0079
0080 if "PANDA_URL_SSL" not in os.environ and panda_config.has_option(
0081 "panda", "panda_url_ssl"
0082 ):
0083 self.panda_url_ssl = panda_config.get("panda", "panda_url_ssl")
0084 os.environ["PANDA_URL_SSL"] = self.panda_url_ssl
0085
0086 if "PANDA_AUTH" not in os.environ and panda_config.has_option(
0087 "panda", "panda_auth"
0088 ):
0089 self.panda_auth = panda_config.get("panda", "panda_auth")
0090 os.environ["PANDA_AUTH"] = self.panda_auth
0091 if "PANDA_AUTH_VO" not in os.environ and panda_config.has_option(
0092 "panda", "panda_auth_vo"
0093 ):
0094 self.panda_auth_vo = panda_config.get("panda", "panda_auth_vo")
0095 os.environ["PANDA_AUTH_VO"] = self.panda_auth_vo
0096 if "PANDA_CONFIG_ROOT" not in os.environ and panda_config.has_option(
0097 "panda", "panda_config_root"
0098 ):
0099 self.panda_config_root = panda_config.get("panda", "panda_config_root")
0100 os.environ["PANDA_CONFIG_ROOT"] = self.panda_config_root
0101
0102 def submit(self, task_params, logger=None, log_prefix="", parent_workload_id=None):
0103 from pandaclient import Client
0104
0105 try:
0106 parent_tid = None
0107 if parent_workload_id:
0108 parent_tid = parent_workload_id
0109 if logger:
0110 logger.info(
0111 log_prefix + "parent_workload_id: %s" % parent_workload_id
0112 )
0113 return_code = Client.insertTaskParams(
0114 task_params, verbose=True, parent_tid=parent_tid
0115 )
0116 if return_code[0] == 0 and return_code[1][0] is True:
0117 try:
0118 ret_string = str(return_code[1][1])
0119 ret_string = ret_string.replace("succeeded. new jediTaskID=", "")
0120 if 'jediTaskID=' in ret_string:
0121 task_id = int(ret_string.split("=")[1])
0122 elif "=" in ret_string:
0123 task_id = int(ret_string.split("=")[1])
0124 return task_id
0125 except Exception as ex:
0126 if logger:
0127 logger.warn(
0128 log_prefix
0129 + "task id is not retruned: (%s) is not task id: %s"
0130 % (return_code[1][1], str(ex))
0131 )
0132 if return_code[1][1] and "jediTaskID=" in return_code[1][1]:
0133 parts = return_code[1][1].split(" ")
0134 for part in parts:
0135 if "jediTaskID=" in part:
0136 task_id = int(part.split("=")[1])
0137 return task_id, None
0138 else:
0139 raise Exception(return_code)
0140 else:
0141 if logger:
0142 logger.warn(
0143 log_prefix
0144 + "submit_panda_task, return_code: %s"
0145 % str(return_code)
0146 )
0147 raise Exception(return_code)
0148 except Exception as ex:
0149 if logger:
0150 logger.error(log_prefix + str(ex))
0151 logger.error(traceback.format_exc())
0152 raise ex
0153
0154 def get_processing_status(self, task_status):
0155 if task_status in ["registered", "defined", "assigning"]:
0156 processing_status = ProcessingStatus.Submitting
0157 elif task_status in [
0158 "ready",
0159 "scouting",
0160 "scouted",
0161 "prepared",
0162 "topreprocess",
0163 "preprocessing",
0164 ]:
0165 processing_status = ProcessingStatus.Submitting
0166 elif task_status in ["pending"]:
0167 processing_status = ProcessingStatus.Submitted
0168 elif task_status in ["running", "toretry", "toincexec", "throttled"]:
0169 processing_status = ProcessingStatus.Running
0170 elif task_status in ["done"]:
0171 processing_status = ProcessingStatus.Finished
0172 elif task_status in ["finished", "paused"]:
0173
0174 processing_status = ProcessingStatus.SubFinished
0175 elif task_status in ["failed", "exhausted"]:
0176
0177 processing_status = ProcessingStatus.Failed
0178 elif task_status in ["aborted"]:
0179
0180 processing_status = ProcessingStatus.Cancelled
0181 elif task_status in ["broken"]:
0182 processing_status = ProcessingStatus.Broken
0183 else:
0184
0185
0186 processing_status = ProcessingStatus.Submitted
0187 return processing_status
0188
0189 def poll(self, workload_id, logger=None, log_prefix=""):
0190 from pandaclient import Client
0191
0192 try:
0193 status, task_status = Client.getTaskStatus(workload_id)
0194 if status == 0:
0195 return self.get_processing_status(task_status)
0196 else:
0197 msg = "Failed to poll task %s: status: %s, task_status: %s" % (
0198 workload_id,
0199 status,
0200 task_status,
0201 )
0202 raise Exception(msg)
0203 except Exception as ex:
0204 if logger:
0205 logger.error(log_prefix + str(ex))
0206 logger.error(traceback.format_exc())
0207 raise ex
0208
0209 def close(self, workload_id, soft=False, logger=None, log_prefix=""):
0210 from pandaclient import Client
0211
0212 try:
0213 if logger:
0214 logger.info(log_prefix + f"aborting task {workload_id}")
0215 Client.finishTask(workload_id, soft=soft)
0216 status, task_status = Client.getTaskStatus(workload_id)
0217 if status == 0:
0218 return self.get_processing_status(task_status)
0219 else:
0220 msg = "Failed to abort task %s: status: %s, task_status: %s" % (
0221 workload_id,
0222 status,
0223 task_status,
0224 )
0225 raise Exception(msg)
0226 except Exception as ex:
0227 if logger:
0228 logger.error(log_prefix + str(ex))
0229 logger.error(traceback.format_exc())
0230 raise ex
0231
0232 def resume(self, workload_id, logger=None, log_prefix=""):
0233 from pandaclient import Client
0234
0235 try:
0236 if logger:
0237 logger.info(log_prefix + f"resuming task {workload_id}")
0238 status, out = Client.retryTask(workload_id, newParams={})
0239 return ProcessingStatus.Running
0240 except Exception as ex:
0241 if logger:
0242 logger.error(log_prefix + str(ex))
0243 logger.error(traceback.format_exc())
0244 raise ex