File indexing completed on 2026-04-10 07:58:40
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 try:
0013 import ConfigParser
0014 except ImportError:
0015 import configparser as ConfigParser
0016
0017 import os
0018 import traceback
0019
0020 from idds.common.constants import ProcessingStatus
0021 from .base import BaseSubmitterPoller
0022
0023
0024 class PandaSubmitterPoller(BaseSubmitterPoller):
0025
0026 def __init__(self, *args, **kwargs):
0027 super(PandaSubmitterPoller, self).__init__()
0028 self.load_panda_urls()
0029
0030 def load_panda_config(self):
0031 panda_config = ConfigParser.ConfigParser()
0032 if os.environ.get('IDDS_PANDA_CONFIG', None):
0033 configfile = os.environ['IDDS_PANDA_CONFIG']
0034 if panda_config.read(configfile) == [configfile]:
0035 return panda_config
0036
0037 configfiles = ['%s/etc/panda/panda.cfg' % os.environ.get('IDDS_HOME', ''),
0038 '/etc/panda/panda.cfg', '/opt/idds/etc/panda/panda.cfg',
0039 '%s/etc/panda/panda.cfg' % os.environ.get('VIRTUAL_ENV', '')]
0040 for configfile in configfiles:
0041 if panda_config.read(configfile) == [configfile]:
0042 return panda_config
0043 return panda_config
0044
0045 def load_panda_urls(self):
0046 panda_config = self.load_panda_config()
0047
0048 self.panda_url = None
0049 self.panda_url_ssl = None
0050 self.panda_monitor = None
0051 self.panda_auth = None
0052 self.panda_auth_vo = None
0053 self.panda_config_root = None
0054 self.pandacache_url = None
0055 self.panda_verify_host = None
0056
0057 if panda_config.has_section('panda'):
0058 if 'PANDA_MONITOR_URL' not in os.environ and panda_config.has_option('panda', 'panda_monitor_url'):
0059 self.panda_monitor = panda_config.get('panda', 'panda_monitor_url')
0060 os.environ['PANDA_MONITOR_URL'] = self.panda_monitor
0061
0062 if 'PANDA_URL' not in os.environ and panda_config.has_option('panda', 'panda_url'):
0063 self.panda_url = panda_config.get('panda', 'panda_url')
0064 os.environ['PANDA_URL'] = self.panda_url
0065
0066 if 'PANDACACHE_URL' not in os.environ and panda_config.has_option('panda', 'pandacache_url'):
0067 self.pandacache_url = panda_config.get('panda', 'pandacache_url')
0068 os.environ['PANDACACHE_URL'] = self.pandacache_url
0069
0070 if 'PANDA_VERIFY_HOST' not in os.environ and panda_config.has_option('panda', 'panda_verify_host'):
0071 self.panda_verify_host = panda_config.get('panda', 'panda_verify_host')
0072 os.environ['PANDA_VERIFY_HOST'] = self.panda_verify_host
0073
0074 if 'PANDA_URL_SSL' not in os.environ and panda_config.has_option('panda', 'panda_url_ssl'):
0075 self.panda_url_ssl = panda_config.get('panda', 'panda_url_ssl')
0076 os.environ['PANDA_URL_SSL'] = self.panda_url_ssl
0077
0078 if 'PANDA_AUTH' not in os.environ and panda_config.has_option('panda', 'panda_auth'):
0079 self.panda_auth = panda_config.get('panda', 'panda_auth')
0080 os.environ['PANDA_AUTH'] = self.panda_auth
0081 if 'PANDA_AUTH_VO' not in os.environ and panda_config.has_option('panda', 'panda_auth_vo'):
0082 self.panda_auth_vo = panda_config.get('panda', 'panda_auth_vo')
0083 os.environ['PANDA_AUTH_VO'] = self.panda_auth_vo
0084 if 'PANDA_CONFIG_ROOT' not in os.environ and panda_config.has_option('panda', 'panda_config_root'):
0085 self.panda_config_root = panda_config.get('panda', 'panda_config_root')
0086 os.environ['PANDA_CONFIG_ROOT'] = self.panda_config_root
0087
0088 def submit(self, work, logger=None, log_prefix=''):
0089 from pandaclient import Client
0090
0091 task_params = self.get_task_params(work)
0092 try:
0093 parent_tid = None
0094 logger.info("parent_workload_id: %s" % work.parent_workload_id)
0095 if work.parent_workload_id:
0096 parent_tid = work.parent_workload_id
0097 return_code = Client.insertTaskParams(task_params, verbose=True, parent_tid=parent_tid)
0098 if return_code[0] == 0 and return_code[1][0] is True:
0099 try:
0100 ret_string = str(return_code[1][1])
0101 ret_string = ret_string.replace("succeeded. new jediTaskID=", "")
0102 if 'jediTaskID=' in ret_string:
0103 task_id = int(ret_string.split("=")[1])
0104 elif "=" in ret_string:
0105 task_id = int(ret_string.split("=")[1])
0106 return task_id, None
0107 except Exception as ex:
0108 if logger:
0109 logger.warn(log_prefix + "task id is not retruned: (%s) is not task id: %s" % (return_code[1][1], str(ex)))
0110 if return_code[1][1] and 'jediTaskID=' in return_code[1][1]:
0111 parts = return_code[1][1].split(" ")
0112 for part in parts:
0113 if 'jediTaskID=' in part:
0114 task_id = int(part.split("=")[1])
0115 return task_id, None
0116 else:
0117 raise Exception(return_code)
0118 else:
0119 if logger:
0120 logger.warn(log_prefix + "submit_panda_task, return_code: %s" % str(return_code))
0121 raise Exception(return_code)
0122 except Exception as ex:
0123 if logger:
0124 logger.error(log_prefix + str(ex))
0125 logger.error(traceback.format_exc())
0126 raise ex
0127
0128 def get_processing_status(self, task_status):
0129 if task_status in ['registered', 'defined', 'assigning']:
0130 processing_status = ProcessingStatus.Submitting
0131 elif task_status in ['ready', 'scouting', 'scouted', 'prepared', 'topreprocess', 'preprocessing']:
0132 processing_status = ProcessingStatus.Submitting
0133 elif task_status in ['pending']:
0134 processing_status = ProcessingStatus.Submitted
0135 elif task_status in ['running', 'toretry', 'toincexec', 'throttled']:
0136 processing_status = ProcessingStatus.Running
0137 elif task_status in ['done']:
0138 processing_status = ProcessingStatus.Finished
0139 elif task_status in ['finished', 'paused']:
0140
0141 processing_status = ProcessingStatus.SubFinished
0142 elif task_status in ['failed', 'exhausted']:
0143
0144 processing_status = ProcessingStatus.Failed
0145 elif task_status in ['aborted']:
0146
0147 processing_status = ProcessingStatus.Cancelled
0148 elif task_status in ['broken']:
0149 processing_status = ProcessingStatus.Broken
0150 else:
0151
0152
0153 processing_status = ProcessingStatus.Submitted
0154 return processing_status
0155
0156 def poll(self, workload_id, logger=None, log_prefix=''):
0157 from pandaclient import Client
0158
0159 try:
0160 status, task_status = Client.getTaskStatus(workload_id)
0161 if status == 0:
0162 return self.get_processing_status(task_status)
0163 else:
0164 msg = "Failed to poll task %s: status: %s, task_status: %s" % (workload_id, status, task_status)
0165 raise Exception(msg)
0166 except Exception as ex:
0167 if logger:
0168 logger.error(log_prefix + str(ex))
0169 logger.error(traceback.format_exc())
0170 raise ex
0171
0172 def abort(self, workload_id, logger=None, log_prefix=''):
0173 from pandaclient import Client
0174
0175 try:
0176 if logger:
0177 logger.info(log_prefix + f"aborting task {workload_id}")
0178 Client.killTask(workload_id, soft=True)
0179 status, task_status = Client.getTaskStatus(workload_id)
0180 if status == 0:
0181 return self.get_processing_status(task_status)
0182 else:
0183 msg = "Failed to abort task %s: status: %s, task_status: %s" % (workload_id, status, task_status)
0184 raise Exception(msg)
0185 except Exception as ex:
0186 if logger:
0187 logger.error(log_prefix + str(ex))
0188 logger.error(traceback.format_exc())
0189 raise ex
0190
0191 def resume(self, workload_id, logger=None, log_prefix=''):
0192 from pandaclient import Client
0193
0194 try:
0195 if logger:
0196 logger.info(log_prefix + f"resuming task {workload_id}")
0197 status, out = Client.retryTask(workload_id, newParams={})
0198 return ProcessingStatus.Running
0199 except Exception as ex:
0200 if logger:
0201 logger.error(log_prefix + str(ex))
0202 logger.error(traceback.format_exc())
0203 raise ex