Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 07:58:40

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024 - 2025
0010 
0011 
0012 try:
0013     import ConfigParser
0014 except ImportError:
0015     import configparser as ConfigParser
0016 
0017 import os
0018 import traceback
0019 
0020 from idds.common.constants import ProcessingStatus
0021 from .base import BaseSubmitterPoller
0022 
0023 
0024 class PandaSubmitterPoller(BaseSubmitterPoller):
0025 
0026     def __init__(self, *args, **kwargs):
0027         super(PandaSubmitterPoller, self).__init__()
0028         self.load_panda_urls()
0029 
0030     def load_panda_config(self):
0031         panda_config = ConfigParser.ConfigParser()
0032         if os.environ.get('IDDS_PANDA_CONFIG', None):
0033             configfile = os.environ['IDDS_PANDA_CONFIG']
0034             if panda_config.read(configfile) == [configfile]:
0035                 return panda_config
0036 
0037         configfiles = ['%s/etc/panda/panda.cfg' % os.environ.get('IDDS_HOME', ''),
0038                        '/etc/panda/panda.cfg', '/opt/idds/etc/panda/panda.cfg',
0039                        '%s/etc/panda/panda.cfg' % os.environ.get('VIRTUAL_ENV', '')]
0040         for configfile in configfiles:
0041             if panda_config.read(configfile) == [configfile]:
0042                 return panda_config
0043         return panda_config
0044 
0045     def load_panda_urls(self):
0046         panda_config = self.load_panda_config()
0047         # self.logger.debug("panda config: %s" % panda_config)
0048         self.panda_url = None
0049         self.panda_url_ssl = None
0050         self.panda_monitor = None
0051         self.panda_auth = None
0052         self.panda_auth_vo = None
0053         self.panda_config_root = None
0054         self.pandacache_url = None
0055         self.panda_verify_host = None
0056 
0057         if panda_config.has_section('panda'):
0058             if 'PANDA_MONITOR_URL' not in os.environ and panda_config.has_option('panda', 'panda_monitor_url'):
0059                 self.panda_monitor = panda_config.get('panda', 'panda_monitor_url')
0060                 os.environ['PANDA_MONITOR_URL'] = self.panda_monitor
0061                 # self.logger.debug("Panda monitor url: %s" % str(self.panda_monitor))
0062             if 'PANDA_URL' not in os.environ and panda_config.has_option('panda', 'panda_url'):
0063                 self.panda_url = panda_config.get('panda', 'panda_url')
0064                 os.environ['PANDA_URL'] = self.panda_url
0065                 # self.logger.debug("Panda url: %s" % str(self.panda_url))
0066             if 'PANDACACHE_URL' not in os.environ and panda_config.has_option('panda', 'pandacache_url'):
0067                 self.pandacache_url = panda_config.get('panda', 'pandacache_url')
0068                 os.environ['PANDACACHE_URL'] = self.pandacache_url
0069                 # self.logger.debug("Pandacache url: %s" % str(self.pandacache_url))
0070             if 'PANDA_VERIFY_HOST' not in os.environ and panda_config.has_option('panda', 'panda_verify_host'):
0071                 self.panda_verify_host = panda_config.get('panda', 'panda_verify_host')
0072                 os.environ['PANDA_VERIFY_HOST'] = self.panda_verify_host
0073                 # self.logger.debug("Panda verify host: %s" % str(self.panda_verify_host))
0074             if 'PANDA_URL_SSL' not in os.environ and panda_config.has_option('panda', 'panda_url_ssl'):
0075                 self.panda_url_ssl = panda_config.get('panda', 'panda_url_ssl')
0076                 os.environ['PANDA_URL_SSL'] = self.panda_url_ssl
0077                 # self.logger.debug("Panda url ssl: %s" % str(self.panda_url_ssl))
0078             if 'PANDA_AUTH' not in os.environ and panda_config.has_option('panda', 'panda_auth'):
0079                 self.panda_auth = panda_config.get('panda', 'panda_auth')
0080                 os.environ['PANDA_AUTH'] = self.panda_auth
0081             if 'PANDA_AUTH_VO' not in os.environ and panda_config.has_option('panda', 'panda_auth_vo'):
0082                 self.panda_auth_vo = panda_config.get('panda', 'panda_auth_vo')
0083                 os.environ['PANDA_AUTH_VO'] = self.panda_auth_vo
0084             if 'PANDA_CONFIG_ROOT' not in os.environ and panda_config.has_option('panda', 'panda_config_root'):
0085                 self.panda_config_root = panda_config.get('panda', 'panda_config_root')
0086                 os.environ['PANDA_CONFIG_ROOT'] = self.panda_config_root
0087 
0088     def submit(self, work, logger=None, log_prefix=''):
0089         from pandaclient import Client
0090 
0091         task_params = self.get_task_params(work)
0092         try:
0093             parent_tid = None
0094             logger.info("parent_workload_id: %s" % work.parent_workload_id)
0095             if work.parent_workload_id:
0096                 parent_tid = work.parent_workload_id
0097             return_code = Client.insertTaskParams(task_params, verbose=True, parent_tid=parent_tid)
0098             if return_code[0] == 0 and return_code[1][0] is True:
0099                 try:
0100                     ret_string = str(return_code[1][1])
0101                     ret_string = ret_string.replace("succeeded. new jediTaskID=", "")
0102                     if 'jediTaskID=' in ret_string:
0103                         task_id = int(ret_string.split("=")[1])
0104                     elif "=" in ret_string:
0105                         task_id = int(ret_string.split("=")[1])
0106                     return task_id, None
0107                 except Exception as ex:
0108                     if logger:
0109                         logger.warn(log_prefix + "task id is not retruned: (%s) is not task id: %s" % (return_code[1][1], str(ex)))
0110                     if return_code[1][1] and 'jediTaskID=' in return_code[1][1]:
0111                         parts = return_code[1][1].split(" ")
0112                         for part in parts:
0113                             if 'jediTaskID=' in part:
0114                                 task_id = int(part.split("=")[1])
0115                                 return task_id, None
0116                     else:
0117                         raise Exception(return_code)
0118             else:
0119                 if logger:
0120                     logger.warn(log_prefix + "submit_panda_task, return_code: %s" % str(return_code))
0121                 raise Exception(return_code)
0122         except Exception as ex:
0123             if logger:
0124                 logger.error(log_prefix + str(ex))
0125                 logger.error(traceback.format_exc())
0126             raise ex
0127 
0128     def get_processing_status(self, task_status):
0129         if task_status in ['registered', 'defined', 'assigning']:
0130             processing_status = ProcessingStatus.Submitting
0131         elif task_status in ['ready', 'scouting', 'scouted', 'prepared', 'topreprocess', 'preprocessing']:
0132             processing_status = ProcessingStatus.Submitting
0133         elif task_status in ['pending']:
0134             processing_status = ProcessingStatus.Submitted
0135         elif task_status in ['running', 'toretry', 'toincexec', 'throttled']:
0136             processing_status = ProcessingStatus.Running
0137         elif task_status in ['done']:
0138             processing_status = ProcessingStatus.Finished
0139         elif task_status in ['finished', 'paused']:
0140             # finished, finishing, waiting it to be done
0141             processing_status = ProcessingStatus.SubFinished
0142         elif task_status in ['failed', 'exhausted']:
0143             # aborting, tobroken
0144             processing_status = ProcessingStatus.Failed
0145         elif task_status in ['aborted']:
0146             # aborting, tobroken
0147             processing_status = ProcessingStatus.Cancelled
0148         elif task_status in ['broken']:
0149             processing_status = ProcessingStatus.Broken
0150         else:
0151             # finished, finishing, aborting, topreprocess, preprocessing, tobroken
0152             # toretry, toincexec, rerefine, paused, throttled, passed
0153             processing_status = ProcessingStatus.Submitted
0154         return processing_status
0155 
0156     def poll(self, workload_id, logger=None, log_prefix=''):
0157         from pandaclient import Client
0158 
0159         try:
0160             status, task_status = Client.getTaskStatus(workload_id)
0161             if status == 0:
0162                 return self.get_processing_status(task_status)
0163             else:
0164                 msg = "Failed to poll task %s: status: %s, task_status: %s" % (workload_id, status, task_status)
0165                 raise Exception(msg)
0166         except Exception as ex:
0167             if logger:
0168                 logger.error(log_prefix + str(ex))
0169                 logger.error(traceback.format_exc())
0170             raise ex
0171 
0172     def abort(self, workload_id, logger=None, log_prefix=''):
0173         from pandaclient import Client
0174 
0175         try:
0176             if logger:
0177                 logger.info(log_prefix + f"aborting task {workload_id}")
0178             Client.killTask(workload_id, soft=True)
0179             status, task_status = Client.getTaskStatus(workload_id)
0180             if status == 0:
0181                 return self.get_processing_status(task_status)
0182             else:
0183                 msg = "Failed to abort task %s: status: %s, task_status: %s" % (workload_id, status, task_status)
0184                 raise Exception(msg)
0185         except Exception as ex:
0186             if logger:
0187                 logger.error(log_prefix + str(ex))
0188                 logger.error(traceback.format_exc())
0189             raise ex
0190 
0191     def resume(self, workload_id, logger=None, log_prefix=''):
0192         from pandaclient import Client
0193 
0194         try:
0195             if logger:
0196                 logger.info(log_prefix + f"resuming task {workload_id}")
0197             status, out = Client.retryTask(workload_id, newParams={})
0198             return ProcessingStatus.Running
0199         except Exception as ex:
0200             if logger:
0201                 logger.error(log_prefix + str(ex))
0202                 logger.error(traceback.format_exc())
0203             raise ex