File indexing completed on 2026-04-10 08:39:01
0001
0002
0003
0004
0005
0006
0007 import json
0008 import os
0009 import shutil
0010 import sys
0011 import traceback
0012 from argparse import ArgumentParser
0013 from copy import deepcopy
0014 from pathlib import Path
0015
0016 from pandacommon.pandalogger.PandaLogger import PandaLogger
0017 from pandacommon.pandautils.thread_utils import GenericThread
0018
0019 from pandaserver.config import panda_config
0020
0021
0022 _logger = PandaLogger().getLogger("cache_schedconfig")
0023
0024
0025 default_dest_dir = getattr(panda_config, "schedconfig_cache_dir", "/var/cache/pandaserver/schedconfig")
0026
0027
0028 class cacheSchedConfig:
0029 """
0030 Class to dump schedconfig on a per-queue basis into cache files
0031 """
0032
0033 def __init__(self, tbuf):
0034 self.tbuf = tbuf
0035 self.queueData = None
0036 self.cloudStatus = None
0037
0038 self.queueDataFields = {
0039
0040
0041 "pilot": [
0042 "appdir",
0043 "allowdirectaccess",
0044 "cloud",
0045 "datadir",
0046 "dq2url",
0047 "copytool",
0048 "copytoolin",
0049 "copysetup",
0050 "copysetupin",
0051 "ddm",
0052 "se",
0053 "sepath",
0054 "seprodpath",
0055 "envsetup",
0056 "envsetupin",
0057 "region",
0058 "copyprefix",
0059 "copyprefixin",
0060 "lfcpath",
0061 "lfcprodpath",
0062 "lfchost",
0063 "lfcregister",
0064 "sein",
0065 "wntmpdir",
0066 "proxy",
0067 "retry",
0068 "recoverdir",
0069 "space",
0070 "memory",
0071 "cmtconfig",
0072 "status",
0073 "setokens",
0074 "glexec",
0075 "seopt",
0076 "gatekeeper",
0077 "pcache",
0078 "maxinputsize",
0079 "timefloor",
0080 "corecount",
0081 "faxredirector",
0082 "allowfax",
0083 "maxtime",
0084 "maxwdir",
0085 ],
0086 "factory": [
0087 "site",
0088 "siteid",
0089 "nickname",
0090 "cloud",
0091 "status",
0092 "jdl",
0093 "queue",
0094 "localqueue",
0095 "nqueue",
0096 "environ",
0097 "proxy",
0098 "glexec",
0099 "depthboost",
0100 "idlepilotsupression",
0101 "pilotlimit",
0102 "transferringlimit",
0103 "memory",
0104 "maxtime",
0105 "system",
0106 "fairsharepolicy",
0107 "autosetup_pre",
0108 "autosetup_post",
0109 ],
0110
0111 "all": None,
0112 }
0113
0114 def query_column_sql(self, sql, varMap=None, arraySize=100):
0115 res = self.tbuf.querySQL(sql, varMap, arraySize=arraySize)
0116 retList = []
0117 for (
0118 panda_queue,
0119 data,
0120 ) in res:
0121 if isinstance(data, str):
0122 dictData = json.loads(data)
0123 elif isinstance(data, dict):
0124 dictData = data
0125 else:
0126 dictData = json.loads(data.read())
0127 dictData["siteid"] = panda_queue
0128 retList.append(dictData)
0129 return retList
0130
0131 def getQueueData(self, site=None, queue=None):
0132
0133 varDict = {}
0134 sql = f"SELECT panda_queue, data from {panda_config.schemaPANDA}.SCHEDCONFIG_JSON"
0135 if site:
0136 sql += " where panda_queue=:site"
0137 varDict[":site"] = site
0138 self.queueData = self.query_column_sql(sql, varDict)
0139 elif queue:
0140 sql += " where panda_queue=:queue"
0141 varDict[":queue"] = queue
0142 self.queueData = self.query_column_sql(sql, varDict)
0143 else:
0144 self.queueData = self.query_column_sql(sql)
0145
0146 def dumpSingleQueue(self, queueDict, dest="/tmp", outputSet="all", format="txt"):
0147 try:
0148 file = os.path.join(dest, queueDict["nickname"] + "." + outputSet + "." + format)
0149 output = open(file, "w")
0150 outputFields = self.queueDataFields[outputSet]
0151 if outputFields is None:
0152 outputFields = queueDict.keys()
0153 if format == "txt":
0154 for outputField in outputFields:
0155 output.write(outputField + "=" + str(queueDict[outputField]))
0156 if format == "pilot":
0157 outputStr = ""
0158 for outputField in outputFields:
0159 if outputField in queueDict and queueDict[outputField]:
0160 outputStr += outputField + "=" + str(queueDict[outputField]) + "|"
0161 else:
0162 outputStr += outputField + "=|"
0163 output.write(outputStr[:-1])
0164 if format == "json":
0165 dumpMe = {}
0166 for outputField in outputFields:
0167 if outputField in queueDict:
0168 val = queueDict[outputField]
0169 else:
0170 val = ""
0171 dumpMe[outputField] = val
0172 json.dump(self.queueDictPythonise(dumpMe), output, sort_keys=True, indent=4)
0173 output.close()
0174
0175 newfile = os.path.join(dest, queueDict["siteid"] + "." + outputSet + "." + format)
0176 if newfile != file:
0177 shutil.copy(file, newfile)
0178 except Exception:
0179 raise
0180
0181 def queueDictPythonise(self, queueDict, deepCopy=True):
0182 """Turn queue dictionary with SQL text fields into a more stuctured python representation"""
0183 if deepCopy:
0184 structDict = deepcopy(queueDict)
0185 else:
0186 structDict = queueDict
0187
0188 if "releases" in structDict and structDict["releases"] is not None:
0189 if isinstance(structDict["releases"], str):
0190 structDict["releases"] = structDict["releases"].split("|")
0191
0192 for timeKey in "lastmod", "tspace":
0193 if timeKey in structDict:
0194 structDict[timeKey] = structDict[timeKey].isoformat()
0195 return structDict
0196
0197 def dumpAllSchedConfig(self, queueArray=None, dest="/tmp"):
0198 """Dumps all of schedconfig into a single json file - allows clients to retrieve a
0199 machine readable version of schedconfig efficiently"""
0200 file = os.path.join(dest, "schedconfig.all.json")
0201 if queueArray is None:
0202 queueArray = self.queueData
0203 output = open(file, "w")
0204 dumpMe = {}
0205 for queueDict in queueArray:
0206 dumpMe[queueDict["nickname"]] = {}
0207 for k in queueDict:
0208 v = queueDict[k]
0209 dumpMe[queueDict["nickname"]][k] = v
0210 dumpMe[queueDict["nickname"]] = self.queueDictPythonise(dumpMe[queueDict["nickname"]])
0211 json.dump(dumpMe, output, sort_keys=True, indent=4)
0212 self.dump_pilot_gdp_config(dest)
0213
0214 def dump_pilot_gdp_config(self, dest="/tmp"):
0215 app = "pilot"
0216 dump_me = {}
0217 sql = f"SELECT key, component, vo from {panda_config.schemaPANDA}.config where app=:app"
0218 r = self.tbuf.querySQL(sql, {":app": app})
0219 for key, component, vo in r:
0220 dump_me.setdefault(vo, {})
0221 value = self.tbuf.getConfigValue(component, key, app, vo)
0222 dump_me[vo][key] = value
0223
0224 _logger.debug(f"pilot GDP config: {str(dump_me)}")
0225 with open(os.path.join(dest, "pilot_gdp_config.json"), "w") as f:
0226 json.dump(dump_me, f, sort_keys=True, indent=4)
0227
0228
0229 def main(argv=tuple(), tbuf=None, **kwargs):
0230 _logger.debug("start")
0231 try:
0232
0233 parser = ArgumentParser()
0234 parser.add_argument(
0235 "-o",
0236 "--output",
0237 dest="dirname",
0238 default=default_dest_dir,
0239 help="write cache outputs to DIR",
0240 metavar="DIR",
0241 )
0242 args = parser.parse_args(list(argv)[1:])
0243
0244 dest_dir_path = Path(args.dirname)
0245 dest_dir_path.mkdir(mode=0o755, exist_ok=True)
0246
0247 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0248 if tbuf is None:
0249 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0250
0251 taskBuffer.init(
0252 panda_config.dbhost,
0253 panda_config.dbpasswd,
0254 nDBConnection=1,
0255 useTimeout=True,
0256 requester=requester_id,
0257 )
0258 else:
0259 taskBuffer = tbuf
0260
0261
0262 cacher = cacheSchedConfig(tbuf=taskBuffer)
0263 cacher.getQueueData()
0264
0265
0266 for queue in cacher.queueData:
0267 cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="pilot", format="pilot")
0268 cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="pilot", format="json")
0269 cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="all", format="json")
0270 cacher.dumpSingleQueue(queue, dest=args.dirname, outputSet="factory", format="json")
0271 _logger.debug("dumped json files for each queue")
0272
0273 cacher.dumpAllSchedConfig(dest=args.dirname)
0274 _logger.debug("dumped schedconfig.all.json")
0275
0276 if tbuf is None:
0277 taskBuffer.cleanup(requester=requester_id)
0278 except Exception as e:
0279 err_str = traceback.format_exc()
0280 _logger.error(f"failed to copy files: {err_str}")
0281
0282 _logger.debug("done")
0283
0284
0285 if __name__ == "__main__":
0286 main(argv=sys.argv)