Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 #! /usr/bin/env python
0002 #
0003 # Dump schedconfig on a per-queue basis into cache files
0004 #
0005 #
0006 
0007 import json
0008 import os
0009 import shutil
0010 import sys
0011 import traceback
0012 from argparse import ArgumentParser
0013 from copy import deepcopy
0014 from pathlib import Path
0015 
0016 from pandacommon.pandalogger.PandaLogger import PandaLogger
0017 from pandacommon.pandautils.thread_utils import GenericThread
0018 
0019 from pandaserver.config import panda_config
0020 
0021 # logger
0022 _logger = PandaLogger().getLogger("cache_schedconfig")
0023 
0024 # default destination directory
0025 default_dest_dir = getattr(panda_config, "schedconfig_cache_dir", "/var/cache/pandaserver/schedconfig")
0026 
0027 
0028 class cacheSchedConfig:
0029     """
0030     Class to dump schedconfig on a per-queue basis into cache files
0031     """
0032 
0033     def __init__(self, tbuf):
0034         self.tbuf = tbuf
0035         self.queueData = None
0036         self.cloudStatus = None
0037         # Define this here, but could be more flexible...
0038         self.queueDataFields = {
0039             # Note that json dumps always use sort_keys=True; for pilot format
0040             # the order defined here is respected
0041             "pilot": [
0042                 "appdir",
0043                 "allowdirectaccess",
0044                 "cloud",
0045                 "datadir",
0046                 "dq2url",
0047                 "copytool",
0048                 "copytoolin",
0049                 "copysetup",
0050                 "copysetupin",
0051                 "ddm",
0052                 "se",
0053                 "sepath",
0054                 "seprodpath",
0055                 "envsetup",
0056                 "envsetupin",
0057                 "region",
0058                 "copyprefix",
0059                 "copyprefixin",
0060                 "lfcpath",
0061                 "lfcprodpath",
0062                 "lfchost",
0063                 "lfcregister",
0064                 "sein",
0065                 "wntmpdir",
0066                 "proxy",
0067                 "retry",
0068                 "recoverdir",
0069                 "space",
0070                 "memory",
0071                 "cmtconfig",
0072                 "status",
0073                 "setokens",
0074                 "glexec",
0075                 "seopt",
0076                 "gatekeeper",
0077                 "pcache",
0078                 "maxinputsize",
0079                 "timefloor",
0080                 "corecount",
0081                 "faxredirector",
0082                 "allowfax",
0083                 "maxtime",
0084                 "maxwdir",
0085             ],
0086             "factory": [
0087                 "site",
0088                 "siteid",
0089                 "nickname",
0090                 "cloud",
0091                 "status",
0092                 "jdl",
0093                 "queue",
0094                 "localqueue",
0095                 "nqueue",
0096                 "environ",
0097                 "proxy",
0098                 "glexec",
0099                 "depthboost",
0100                 "idlepilotsupression",
0101                 "pilotlimit",
0102                 "transferringlimit",
0103                 "memory",
0104                 "maxtime",
0105                 "system",
0106                 "fairsharepolicy",
0107                 "autosetup_pre",
0108                 "autosetup_post",
0109             ],
0110             # None is magic here and really means "all"
0111             "all": None,
0112         }
0113 
0114     def query_column_sql(self, sql, varMap=None, arraySize=100):
0115         res = self.tbuf.querySQL(sql, varMap, arraySize=arraySize)
0116         retList = []
0117         for (
0118             panda_queue,
0119             data,
0120         ) in res:
0121             if isinstance(data, str):
0122                 dictData = json.loads(data)
0123             elif isinstance(data, dict):
0124                 dictData = data
0125             else:
0126                 dictData = json.loads(data.read())
0127             dictData["siteid"] = panda_queue
0128             retList.append(dictData)
0129         return retList
0130 
0131     def getQueueData(self, site=None, queue=None):
0132         # Dump schedconfig in a single query (it's not very big)
0133         varDict = {}
0134         sql = f"SELECT panda_queue, data from {panda_config.schemaPANDA}.SCHEDCONFIG_JSON"
0135         if site:
0136             sql += " where panda_queue=:site"
0137             varDict[":site"] = site
0138             self.queueData = self.query_column_sql(sql, varDict)
0139         elif queue:
0140             sql += " where panda_queue=:queue"
0141             varDict[":queue"] = queue
0142             self.queueData = self.query_column_sql(sql, varDict)
0143         else:
0144             self.queueData = self.query_column_sql(sql)
0145 
0146     def dumpSingleQueue(self, queueDict, dest="/tmp", outputSet="all", format="txt"):
0147         try:
0148             file = os.path.join(dest, queueDict["nickname"] + "." + outputSet + "." + format)
0149             output = open(file, "w")
0150             outputFields = self.queueDataFields[outputSet]
0151             if outputFields is None:
0152                 outputFields = queueDict.keys()
0153             if format == "txt":
0154                 for outputField in outputFields:
0155                     output.write(outputField + "=" + str(queueDict[outputField]))
0156             if format == "pilot":
0157                 outputStr = ""
0158                 for outputField in outputFields:
0159                     if outputField in queueDict and queueDict[outputField]:
0160                         outputStr += outputField + "=" + str(queueDict[outputField]) + "|"
0161                     else:
0162                         outputStr += outputField + "=|"
0163                 output.write(outputStr[:-1])
0164             if format == "json":
0165                 dumpMe = {}
0166                 for outputField in outputFields:
0167                     if outputField in queueDict:
0168                         val = queueDict[outputField]
0169                     else:
0170                         val = ""
0171                     dumpMe[outputField] = val
0172                 json.dump(self.queueDictPythonise(dumpMe), output, sort_keys=True, indent=4)
0173             output.close()
0174             # a copy of the file, when makes sense, with filename based on siteid
0175             newfile = os.path.join(dest, queueDict["siteid"] + "." + outputSet + "." + format)
0176             if newfile != file:
0177                 shutil.copy(file, newfile)
0178         except Exception:
0179             raise
0180 
0181     def queueDictPythonise(self, queueDict, deepCopy=True):
0182         """Turn queue dictionary with SQL text fields into a more stuctured python representation"""
0183         if deepCopy:
0184             structDict = deepcopy(queueDict)
0185         else:
0186             structDict = queueDict
0187 
0188         if "releases" in structDict and structDict["releases"] is not None:
0189             if isinstance(structDict["releases"], str):
0190                 structDict["releases"] = structDict["releases"].split("|")
0191 
0192         for timeKey in "lastmod", "tspace":
0193             if timeKey in structDict:
0194                 structDict[timeKey] = structDict[timeKey].isoformat()
0195         return structDict
0196 
0197     def dumpAllSchedConfig(self, queueArray=None, dest="/tmp"):
0198         """Dumps all of schedconfig into a single json file - allows clients to retrieve a
0199         machine readable version of schedconfig efficiently"""
0200         file = os.path.join(dest, "schedconfig.all.json")
0201         if queueArray is None:
0202             queueArray = self.queueData
0203         output = open(file, "w")
0204         dumpMe = {}
0205         for queueDict in queueArray:
0206             dumpMe[queueDict["nickname"]] = {}
0207             for k in queueDict:
0208                 v = queueDict[k]
0209                 dumpMe[queueDict["nickname"]][k] = v
0210             dumpMe[queueDict["nickname"]] = self.queueDictPythonise(dumpMe[queueDict["nickname"]])
0211         json.dump(dumpMe, output, sort_keys=True, indent=4)
0212         self.dump_pilot_gdp_config(dest)
0213 
0214     def dump_pilot_gdp_config(self, dest="/tmp"):
0215         app = "pilot"
0216         dump_me = {}
0217         sql = f"SELECT key, component, vo from {panda_config.schemaPANDA}.config where app=:app"
0218         r = self.tbuf.querySQL(sql, {":app": app})
0219         for key, component, vo in r:
0220             dump_me.setdefault(vo, {})
0221             value = self.tbuf.getConfigValue(component, key, app, vo)
0222             dump_me[vo][key] = value
0223         # dump
0224         _logger.debug(f"pilot GDP config: {str(dump_me)}")
0225         with open(os.path.join(dest, "pilot_gdp_config.json"), "w") as f:
0226             json.dump(dump_me, f, sort_keys=True, indent=4)
0227 
0228 
0229 def main(argv=tuple(), tbuf=None, **kwargs):
0230     _logger.debug("start")
0231     try:
0232         # parse arguments
0233         parser = ArgumentParser()
0234         parser.add_argument(
0235             "-o",
0236             "--output",
0237             dest="dirname",
0238             default=default_dest_dir,
0239             help="write cache outputs to DIR",
0240             metavar="DIR",
0241         )
0242         args = parser.parse_args(list(argv)[1:])
0243         # ensure destination directory
0244         dest_dir_path = Path(args.dirname)
0245         dest_dir_path.mkdir(mode=0o755, exist_ok=True)
0246         # instantiate TB
0247         requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0248         if tbuf is None:
0249             from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0250 
0251             taskBuffer.init(
0252                 panda_config.dbhost,
0253                 panda_config.dbpasswd,
0254                 nDBConnection=1,
0255                 useTimeout=True,
0256                 requester=requester_id,
0257             )
0258         else:
0259             taskBuffer = tbuf
0260 
0261         # initialize
0262         cacher = cacheSchedConfig(tbuf=taskBuffer)
0263         cacher.getQueueData()
0264 
0265         # dump
0266         for queue in cacher.queueData:
0267             cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="pilot", format="pilot")
0268             cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="pilot", format="json")
0269             cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="all", format="json")
0270             cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="factory", format="json")
0271         _logger.debug("dumped json files for each queue")
0272         # Big dumper
0273         cacher.dumpAllSchedConfig(dest=args.dirname)
0274         _logger.debug("dumped schedconfig.all.json")
0275         # stop taskBuffer if created inside this script
0276         if tbuf is None:
0277             taskBuffer.cleanup(requester=requester_id)
0278     except Exception as e:
0279         err_str = traceback.format_exc()
0280         _logger.error(f"failed to copy files: {err_str}")
0281     # done
0282     _logger.debug("done")
0283 
0284 
0285 if __name__ == "__main__":
0286     main(argv=sys.argv)