Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2025
0010 
0011 import os
0012 import traceback
0013 
0014 import configparser as ConfigParser
0015 
0016 from idds.common.constants import ProcessingStatus
0017 
0018 
0019 class PandaClient(object):
0020 
0021     def __init__(self, *args, **kwargs):
0022         super(PandaClient, self).__init__()
0023         self.load_panda_urls()
0024 
0025     def load_panda_config(self):
0026         panda_config = ConfigParser.ConfigParser()
0027         if os.environ.get("IDDS_PANDA_CONFIG", None):
0028             configfile = os.environ["IDDS_PANDA_CONFIG"]
0029             if panda_config.read(configfile) == [configfile]:
0030                 return panda_config
0031 
0032         configfiles = [
0033             "%s/etc/panda/panda.cfg" % os.environ.get("IDDS_HOME", ""),
0034             "/etc/panda/panda.cfg",
0035             "/opt/idds/etc/panda/panda.cfg",
0036             "%s/etc/panda/panda.cfg" % os.environ.get("VIRTUAL_ENV", ""),
0037         ]
0038         for configfile in configfiles:
0039             if panda_config.read(configfile) == [configfile]:
0040                 return panda_config
0041         return panda_config
0042 
0043     def load_panda_urls(self):
0044         panda_config = self.load_panda_config()
0045         # self.logger.debug("panda config: %s" % panda_config)
0046         self.panda_url = None
0047         self.panda_url_ssl = None
0048         self.panda_monitor = None
0049         self.panda_auth = None
0050         self.panda_auth_vo = None
0051         self.panda_config_root = None
0052         self.pandacache_url = None
0053         self.panda_verify_host = None
0054 
0055         if panda_config.has_section("panda"):
0056             if "PANDA_MONITOR_URL" not in os.environ and panda_config.has_option(
0057                 "panda", "panda_monitor_url"
0058             ):
0059                 self.panda_monitor = panda_config.get("panda", "panda_monitor_url")
0060                 os.environ["PANDA_MONITOR_URL"] = self.panda_monitor
0061                 # self.logger.debug("Panda monitor url: %s" % str(self.panda_monitor))
0062             if "PANDA_URL" not in os.environ and panda_config.has_option(
0063                 "panda", "panda_url"
0064             ):
0065                 self.panda_url = panda_config.get("panda", "panda_url")
0066                 os.environ["PANDA_URL"] = self.panda_url
0067                 # self.logger.debug("Panda url: %s" % str(self.panda_url))
0068             if "PANDACACHE_URL" not in os.environ and panda_config.has_option(
0069                 "panda", "pandacache_url"
0070             ):
0071                 self.pandacache_url = panda_config.get("panda", "pandacache_url")
0072                 os.environ["PANDACACHE_URL"] = self.pandacache_url
0073                 # self.logger.debug("Pandacache url: %s" % str(self.pandacache_url))
0074             if "PANDA_VERIFY_HOST" not in os.environ and panda_config.has_option(
0075                 "panda", "panda_verify_host"
0076             ):
0077                 self.panda_verify_host = panda_config.get("panda", "panda_verify_host")
0078                 os.environ["PANDA_VERIFY_HOST"] = self.panda_verify_host
0079                 # self.logger.debug("Panda verify host: %s" % str(self.panda_verify_host))
0080             if "PANDA_URL_SSL" not in os.environ and panda_config.has_option(
0081                 "panda", "panda_url_ssl"
0082             ):
0083                 self.panda_url_ssl = panda_config.get("panda", "panda_url_ssl")
0084                 os.environ["PANDA_URL_SSL"] = self.panda_url_ssl
0085                 # self.logger.debug("Panda url ssl: %s" % str(self.panda_url_ssl))
0086             if "PANDA_AUTH" not in os.environ and panda_config.has_option(
0087                 "panda", "panda_auth"
0088             ):
0089                 self.panda_auth = panda_config.get("panda", "panda_auth")
0090                 os.environ["PANDA_AUTH"] = self.panda_auth
0091             if "PANDA_AUTH_VO" not in os.environ and panda_config.has_option(
0092                 "panda", "panda_auth_vo"
0093             ):
0094                 self.panda_auth_vo = panda_config.get("panda", "panda_auth_vo")
0095                 os.environ["PANDA_AUTH_VO"] = self.panda_auth_vo
0096             if "PANDA_CONFIG_ROOT" not in os.environ and panda_config.has_option(
0097                 "panda", "panda_config_root"
0098             ):
0099                 self.panda_config_root = panda_config.get("panda", "panda_config_root")
0100                 os.environ["PANDA_CONFIG_ROOT"] = self.panda_config_root
0101 
0102     def submit(self, task_params, logger=None, log_prefix="", parent_workload_id=None):
0103         from pandaclient import Client
0104 
0105         try:
0106             parent_tid = None
0107             if parent_workload_id:
0108                 parent_tid = parent_workload_id
0109                 if logger:
0110                     logger.info(
0111                         log_prefix + "parent_workload_id: %s" % parent_workload_id
0112                     )
0113             return_code = Client.insertTaskParams(
0114                 task_params, verbose=True, parent_tid=parent_tid
0115             )
0116             if return_code[0] == 0 and return_code[1][0] is True:
0117                 try:
0118                     ret_string = str(return_code[1][1])
0119                     ret_string = ret_string.replace("succeeded. new jediTaskID=", "")
0120                     if 'jediTaskID=' in ret_string:
0121                         task_id = int(ret_string.split("=")[1])
0122                     elif "=" in ret_string:
0123                         task_id = int(ret_string.split("=")[1])
0124                     return task_id
0125                 except Exception as ex:
0126                     if logger:
0127                         logger.warn(
0128                             log_prefix
0129                             + "task id is not retruned: (%s) is not task id: %s"  # noqa W503
0130                             % (return_code[1][1], str(ex))
0131                         )
0132                     if return_code[1][1] and "jediTaskID=" in return_code[1][1]:
0133                         parts = return_code[1][1].split(" ")
0134                         for part in parts:
0135                             if "jediTaskID=" in part:
0136                                 task_id = int(part.split("=")[1])
0137                                 return task_id, None
0138                     else:
0139                         raise Exception(return_code)
0140             else:
0141                 if logger:
0142                     logger.warn(
0143                         log_prefix
0144                         + "submit_panda_task, return_code: %s"  # noqa W503
0145                         % str(return_code)  # noqa W503
0146                     )
0147                 raise Exception(return_code)
0148         except Exception as ex:
0149             if logger:
0150                 logger.error(log_prefix + str(ex))
0151                 logger.error(traceback.format_exc())
0152             raise ex
0153 
0154     def get_processing_status(self, task_status):
0155         if task_status in ["registered", "defined", "assigning"]:
0156             processing_status = ProcessingStatus.Submitting
0157         elif task_status in [
0158             "ready",
0159             "scouting",
0160             "scouted",
0161             "prepared",
0162             "topreprocess",
0163             "preprocessing",
0164         ]:
0165             processing_status = ProcessingStatus.Submitting
0166         elif task_status in ["pending"]:
0167             processing_status = ProcessingStatus.Submitted
0168         elif task_status in ["running", "toretry", "toincexec", "throttled"]:
0169             processing_status = ProcessingStatus.Running
0170         elif task_status in ["done"]:
0171             processing_status = ProcessingStatus.Finished
0172         elif task_status in ["finished", "paused"]:
0173             # finished, finishing, waiting it to be done
0174             processing_status = ProcessingStatus.SubFinished
0175         elif task_status in ["failed", "exhausted"]:
0176             # aborting, tobroken
0177             processing_status = ProcessingStatus.Failed
0178         elif task_status in ["aborted"]:
0179             # aborting, tobroken
0180             processing_status = ProcessingStatus.Cancelled
0181         elif task_status in ["broken"]:
0182             processing_status = ProcessingStatus.Broken
0183         else:
0184             # finished, finishing, aborting, topreprocess, preprocessing, tobroken
0185             # toretry, toincexec, rerefine, paused, throttled, passed
0186             processing_status = ProcessingStatus.Submitted
0187         return processing_status
0188 
0189     def poll(self, workload_id, logger=None, log_prefix=""):
0190         from pandaclient import Client
0191 
0192         try:
0193             status, task_status = Client.getTaskStatus(workload_id)
0194             if status == 0:
0195                 return self.get_processing_status(task_status)
0196             else:
0197                 msg = "Failed to poll task %s: status: %s, task_status: %s" % (
0198                     workload_id,
0199                     status,
0200                     task_status,
0201                 )
0202                 raise Exception(msg)
0203         except Exception as ex:
0204             if logger:
0205                 logger.error(log_prefix + str(ex))
0206                 logger.error(traceback.format_exc())
0207             raise ex
0208 
0209     def close(self, workload_id, soft=False, logger=None, log_prefix=""):
0210         from pandaclient import Client
0211 
0212         try:
0213             if logger:
0214                 logger.info(log_prefix + f"aborting task {workload_id}")
0215             Client.finishTask(workload_id, soft=soft)
0216             status, task_status = Client.getTaskStatus(workload_id)
0217             if status == 0:
0218                 return self.get_processing_status(task_status)
0219             else:
0220                 msg = "Failed to abort task %s: status: %s, task_status: %s" % (
0221                     workload_id,
0222                     status,
0223                     task_status,
0224                 )
0225                 raise Exception(msg)
0226         except Exception as ex:
0227             if logger:
0228                 logger.error(log_prefix + str(ex))
0229                 logger.error(traceback.format_exc())
0230             raise ex
0231 
0232     def resume(self, workload_id, logger=None, log_prefix=""):
0233         from pandaclient import Client
0234 
0235         try:
0236             if logger:
0237                 logger.info(log_prefix + f"resuming task {workload_id}")
0238             status, out = Client.retryTask(workload_id, newParams={})
0239             return ProcessingStatus.Running
0240         except Exception as ex:
0241             if logger:
0242                 logger.error(log_prefix + str(ex))
0243                 logger.error(traceback.format_exc())
0244             raise ex