File indexing completed on 2026-04-20 07:59:00
0001 import argparse
0002 import json
0003 import logging
0004 import random
0005 import sys
0006 import threading
0007 import time
0008 from concurrent.futures import ThreadPoolExecutor
0009
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore import fifos as harvesterFifos
0013 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0014 from pandaharvester.harvestercore.work_spec import WorkSpec
0015 from pandaharvester.harvestermisc.selfcheck import harvesterPackageInfo
0016
0017
0018
0019
0020 def setupLogger(logger):
0021 logger.setLevel(logging.DEBUG)
0022 hdlr = logging.StreamHandler()
0023
0024 def emit_decorator(fn):
0025 def func(*args):
0026 levelno = args[0].levelno
0027 if levelno >= logging.CRITICAL:
0028 color = "\033[35;1m"
0029 elif levelno >= logging.ERROR:
0030 color = "\033[31;1m"
0031 elif levelno >= logging.WARNING:
0032 color = "\033[33;1m"
0033 elif levelno >= logging.INFO:
0034 color = "\033[32;1m"
0035 elif levelno >= logging.DEBUG:
0036 color = "\033[36;1m"
0037 else:
0038 color = "\033[0m"
0039 formatter = logging.Formatter(f"{color}[%(asctime)s %(levelname)s] %(message)s\x1b[0m")
0040 hdlr.setFormatter(formatter)
0041 return fn(*args)
0042
0043 return func
0044
0045 hdlr.emit = emit_decorator(hdlr.emit)
0046 logger.addHandler(hdlr)
0047
0048
0049 mainLogger = logging.getLogger("HarvesterAdminTool")
0050 setupLogger(mainLogger)
0051
0052
0053
0054
0055 def json_print(data):
0056 print(json.dumps(data, sort_keys=True, indent=4))
0057
0058
0059 def multithread_executer(func, n_objects, n_threads, initializer=None, initargs=()):
0060 with ThreadPoolExecutor(n_threads, initializer=initializer, initargs=initargs) as _pool:
0061 retIterator = _pool.map(func, range(n_objects))
0062 return retIterator
0063
0064
0065 def get_harvester_attributes():
0066 attr_list = [
0067 "harvesterID",
0068 "version",
0069 "commit_info",
0070 "harvester_config",
0071 ]
0072 return attr_list
0073
0074
0075 def repopulate_fifos(*names):
0076 fifo_class_name_map = {
0077 "monitor": "MonitorFIFO",
0078 }
0079 if len(names) > 0:
0080 fifo_class_name_list = [fifo_class_name_map.get(name) for name in names]
0081 else:
0082 fifo_class_name_list = fifo_class_name_map.values()
0083 for fifo_class_name in fifo_class_name_list:
0084 if fifo_class_name is None:
0085 continue
0086 fifo = getattr(harvesterFifos, fifo_class_name)()
0087 if not fifo.enabled:
0088 continue
0089 fifo.populate(clear_fifo=True)
0090 print(f"Repopulated {fifo.titleName} fifo")
0091
0092
0093
0094
0095
0096
0097
0098 def test(arguments):
0099 mainLogger.critical("Harvester Admin Tool: test CRITICAL")
0100 mainLogger.error("Harvester Admin Tool: test ERROR")
0101 mainLogger.warning("Harvester Admin Tool: test WARNING")
0102 mainLogger.info("Harvester Admin Tool: test INFO")
0103 mainLogger.debug("Harvester Admin Tool: test DEBUG")
0104 print("Harvester Admin Tool: test")
0105
0106
0107 def get(arguments):
0108 attr = arguments.attribute
0109 hpi = harvesterPackageInfo(None)
0110 if attr not in get_harvester_attributes():
0111 mainLogger.error(f"Invalid attribute: {attr}")
0112 return
0113 elif attr == "version":
0114 print(hpi.version)
0115 elif attr == "commit_info":
0116 print(hpi.commit_info)
0117 elif attr == "harvesterID":
0118 print(harvester_config.master.harvester_id)
0119 elif attr == "harvester_config":
0120 json_print(harvester_config.config_dict)
0121
0122
0123 def fifo_benchmark(arguments):
0124 n_objects = arguments.n_objects
0125 n_threads = arguments.n_threads
0126 sum_dict = {
0127 "put_n": 0,
0128 "put_time": 0.0,
0129 "get_time": 0.0,
0130 "get_protective_time": 0.0,
0131 "clear_time": 0.0,
0132 }
0133 thread_lock = threading.Lock()
0134 thread_fifo_map = {}
0135
0136 def _thread_initializer():
0137 thread_id = threading.get_ident()
0138 with thread_lock:
0139 thread_fifo_map[thread_id] = harvesterFifos.BenchmarkFIFO()
0140
0141 def _put_object(i_index):
0142 mq = thread_fifo_map.get(threading.get_ident())
0143 if mq is None:
0144 return
0145 workspec = WorkSpec()
0146 workspec.workerID = i_index
0147 data = {"random": [(i_index**2) % 2**16, random.random()]}
0148 workspec.workAttributes = data
0149 mq.put(workspec)
0150
0151 def _get_object(i_index):
0152 mq = thread_fifo_map.get(threading.get_ident())
0153 if mq is None:
0154 return
0155 return mq.get(timeout=3, protective=False)
0156
0157 def _get_object_protective(i_index):
0158 mq = thread_fifo_map.get(threading.get_ident())
0159 if mq is None:
0160 return
0161 return mq.get(timeout=3, protective=True)
0162
0163 def put_test():
0164 sw = core_utils.get_stopwatch()
0165 sw.reset()
0166 multithread_executer(_put_object, n_objects, n_threads, _thread_initializer)
0167 sum_dict["put_time"] += sw.get_elapsed_time_in_sec()
0168 sum_dict["put_n"] += 1
0169 print(f"Put {n_objects} objects by {n_threads} threads" + sw.get_elapsed_time())
0170 benchmark_mq = harvesterFifos.BenchmarkFIFO()
0171 print(f"Now fifo size is {benchmark_mq.size()}")
0172 del benchmark_mq
0173
0174 def get_test():
0175 sw = core_utils.get_stopwatch()
0176 sw.reset()
0177 multithread_executer(_get_object, n_objects, n_threads, _thread_initializer)
0178 sum_dict["get_time"] = sw.get_elapsed_time_in_sec()
0179 print(f"Get {n_objects} objects by {n_threads} threads" + sw.get_elapsed_time())
0180 benchmark_mq = harvesterFifos.BenchmarkFIFO()
0181 print(f"Now fifo size is {benchmark_mq.size()}")
0182 del benchmark_mq
0183
0184 def get_protective_test():
0185 sw = core_utils.get_stopwatch()
0186 sw.reset()
0187 multithread_executer(_get_object_protective, n_objects, n_threads, _thread_initializer)
0188 sum_dict["get_protective_time"] = sw.get_elapsed_time_in_sec()
0189 print(f"Get {n_objects} objects protective dequeue by {n_threads} threads" + sw.get_elapsed_time())
0190 benchmark_mq = harvesterFifos.BenchmarkFIFO()
0191 print(f"Now fifo size is {benchmark_mq.size()}")
0192 del benchmark_mq
0193
0194 def clear_test():
0195 benchmark_mq = harvesterFifos.BenchmarkFIFO()
0196 sw = core_utils.get_stopwatch()
0197 sw.reset()
0198 benchmark_mq.fifo.clear()
0199 sum_dict["clear_time"] = sw.get_elapsed_time_in_sec()
0200 print("Cleared fifo" + sw.get_elapsed_time())
0201 print(f"Now fifo size is {benchmark_mq.size()}")
0202 del benchmark_mq
0203
0204
0205 print("Start fifo benchmark ...")
0206 benchmark_mq = harvesterFifos.BenchmarkFIFO()
0207 benchmark_mq.fifo.clear()
0208 print("Cleared fifo")
0209 put_test()
0210 get_test()
0211 put_test()
0212 get_protective_test()
0213 put_test()
0214 clear_test()
0215 print("Finished fifo benchmark")
0216
0217 print("Summary:")
0218 print(f"FIFO plugin is: {benchmark_mq.fifo.__class__.__name__}")
0219 print(f"Benchmark with {n_objects} objects by {n_threads} threads")
0220 print(f"Put : {1000.0 * sum_dict['put_time'] / (sum_dict['put_n'] * n_objects):.3f} ms / obj")
0221 print(f"Get : {1000.0 * sum_dict['get_time'] / n_objects:.3f} ms / obj")
0222 print(f"Get protective : {1000.0 * sum_dict['get_protective_time'] / n_objects:.3f} ms / obj")
0223 print(f"Clear : {1000.0 * sum_dict['clear_time'] / n_objects:.3f} ms / obj")
0224
0225
0226 def fifo_repopulate(arguments):
0227 if "ALL" in arguments.name_list:
0228 repopulate_fifos()
0229 else:
0230 repopulate_fifos(*arguments.name_list)
0231
0232
0233 def cacher_refresh(arguments):
0234 from pandaharvester.harvesterbody.cacher import Cacher
0235 from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0236
0237 communicatorPool = CommunicatorPool()
0238 cacher = Cacher(communicatorPool)
0239 cacher.execute(force_update=True, skip_lock=True, n_threads=4)
0240
0241
0242 def qconf_list(arguments):
0243 from pandaharvester.harvesterscripts import queue_config_tool
0244
0245 if arguments.all:
0246 queue_config_tool.list_config_ids()
0247 else:
0248 queue_config_tool.list_active_queues()
0249
0250
0251 def qconf_refresh(arguments):
0252 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0253
0254 qcm = QueueConfigMapper()
0255 qcm._update_last_reload_time()
0256 qcm.lastUpdate = None
0257 qcm.load_data(refill_table=arguments.refill)
0258
0259
0260 def qconf_dump(arguments):
0261 from pandaharvester.harvesterscripts import queue_config_tool
0262
0263 to_print = not arguments.json
0264 try:
0265 if arguments.id_list:
0266 res_list = [vars(queue_config_tool.dump_queue_with_config_id(configID, to_print)) for configID in arguments.id_list]
0267 resObj = {obj.get("queueName"): obj for obj in res_list}
0268 elif arguments.all:
0269 res_list = queue_config_tool.dump_all_active_queues(to_print)
0270 if res_list is None or to_print:
0271 resObj = {}
0272 else:
0273 resObj = {vars(qm).get("queueName", ""): vars(qm) for qm in res_list}
0274 else:
0275 resObj = {queue: vars(queue_config_tool.dump_active_queue(queue, to_print)) for queue in arguments.queue_list}
0276 except TypeError as e:
0277 if str(e) == "vars() argument must have __dict__ attribute":
0278 resObj = {}
0279 else:
0280 raise
0281 if arguments.json:
0282 json_print(resObj)
0283
0284
0285 def qconf_purge(arguments):
0286 queueName = arguments.queue
0287 dbProxy = DBProxy()
0288 retVal = dbProxy.purge_pq(queueName)
0289 if retVal:
0290 print(f"Purged {queueName} from harvester DB")
0291 else:
0292 mainLogger.critical(f"Failed to purge {queueName} . See panda-db_proxy.log")
0293
0294
0295 def kill_workers(arguments):
0296 status_in = "ALL" if (len(arguments.status) == 1 and arguments.status[0] == "ALL") else arguments.status
0297 computingSite_in = "ALL" if (len(arguments.sites) == 1 and arguments.sites[0] == "ALL") else arguments.sites
0298 computingElement_in = "ALL" if (len(arguments.ces) == 1 and arguments.ces[0] == "ALL") else arguments.ces
0299 submissionHost_in = "ALL" if (len(arguments.submissionhosts) == 1 and arguments.submissionhosts[0] == "ALL") else arguments.submissionhosts
0300 dbProxy = DBProxy()
0301 retVal = dbProxy.mark_workers_to_kill_by_query(
0302 {"status": status_in, "computingSite": computingSite_in, "computingElement": computingElement_in, "submissionHost": submissionHost_in}
0303 )
0304 if retVal is not None:
0305 msg_temp = (
0306 "Sweeper will soon kill {n_workers} workers, with "
0307 "status in {status_in}, "
0308 "computingSite in {computingSite_in}, "
0309 "computingElement in {computingElement_in}, "
0310 "submissionHost in {submissionHost_in}"
0311 )
0312 print(
0313 msg_temp.format(
0314 n_workers=retVal,
0315 status_in=status_in,
0316 computingSite_in=computingSite_in,
0317 computingElement_in=computingElement_in,
0318 submissionHost_in=submissionHost_in,
0319 )
0320 )
0321 else:
0322 mainLogger.critical("Failed to kill workers. See panda-db_proxy.log")
0323
0324
0325 def query_workers(arguments):
0326 dbProxy = DBProxy()
0327 try:
0328 if arguments.all:
0329 res_obj = dbProxy.get_worker_stats_full()
0330 else:
0331 res_obj = dbProxy.get_worker_stats_full(filter_site_list=arguments.queue_list)
0332 json_print(res_obj)
0333 except TypeError as e:
0334 raise
0335
0336
0337 def query_jobs(arguments):
0338 dbProxy = DBProxy()
0339 try:
0340 if arguments.all:
0341 res_obj = dbProxy.get_job_stats_full()
0342 else:
0343 res_obj = dbProxy.get_job_stats_full(filter_site_list=arguments.queue_list)
0344 json_print(res_obj)
0345 except TypeError as e:
0346 raise
0347
0348
0349
0350
0351
0352 commandMap = {
0353
0354 "test": test,
0355
0356 "get": get,
0357
0358 "fifo_benchmark": fifo_benchmark,
0359 "fifo_repopulate": fifo_repopulate,
0360
0361 "cacher_refresh": cacher_refresh,
0362
0363 "qconf_list": qconf_list,
0364 "qconf_dump": qconf_dump,
0365 "qconf_refresh": qconf_refresh,
0366 "qconf_purge": qconf_purge,
0367
0368 "kill_workers": kill_workers,
0369
0370 "query_workers": query_workers,
0371 "query_jobs": query_jobs,
0372 }
0373
0374
0375
0376
0377 def main():
0378
0379 oparser = argparse.ArgumentParser(prog="harvester-admin", add_help=True)
0380 subparsers = oparser.add_subparsers()
0381 oparser.add_argument("-v", "--verbose", "--debug", action="store_true", dest="debug", help="Print more verbose output. (Debug mode !)")
0382
0383
0384 test_parser = subparsers.add_parser("test", help="for testing only")
0385 test_parser.set_defaults(which="test")
0386
0387
0388 get_parser = subparsers.add_parser("get", help="get attributes of this harvester")
0389 get_parser.set_defaults(which="get")
0390 get_parser.add_argument("attribute", type=str, action="store", metavar="<attribute>", choices=get_harvester_attributes(), help="attribute")
0391
0392
0393 fifo_parser = subparsers.add_parser("fifo", help="fifo related")
0394 fifo_subparsers = fifo_parser.add_subparsers()
0395
0396 fifo_benchmark_parser = fifo_subparsers.add_parser("benchmark", help="benchmark fifo backend")
0397 fifo_benchmark_parser.set_defaults(which="fifo_benchmark")
0398 fifo_benchmark_parser.add_argument("-n", type=int, dest="n_objects", action="store", default=500, metavar="<N>", help="Benchmark with N objects")
0399 fifo_benchmark_parser.add_argument("-t", type=int, dest="n_threads", action="store", default=1, metavar="<N>", help="Benchmark with N threads")
0400
0401 fifo_repopulate_parser = fifo_subparsers.add_parser("repopulate", help="Repopulate agent fifo")
0402 fifo_repopulate_parser.set_defaults(which="fifo_repopulate")
0403 fifo_repopulate_parser.add_argument(
0404 "name_list", nargs="+", type=str, action="store", metavar="<agent_name>", help='Name of agent fifo, e.g. "monitor" ("ALL" for all)'
0405 )
0406
0407
0408 cacher_parser = subparsers.add_parser("cacher", help="cacher related")
0409 cacher_subparsers = cacher_parser.add_subparsers()
0410
0411 cacher_refresh_parser = cacher_subparsers.add_parser("refresh", help="refresh cacher immediately")
0412 cacher_refresh_parser.set_defaults(which="cacher_refresh")
0413
0414
0415 qconf_parser = subparsers.add_parser("qconf", help="queue configuration")
0416 qconf_subparsers = qconf_parser.add_subparsers()
0417
0418 qconf_list_parser = qconf_subparsers.add_parser("list", help="List queues. Only active queues listed by default")
0419 qconf_list_parser.set_defaults(which="qconf_list")
0420 qconf_list_parser.add_argument("-a", "--all", dest="all", action="store_true", help="List name and configID of all queues")
0421
0422 qconf_dump_parser = qconf_subparsers.add_parser("dump", help="Dump queue configurations")
0423 qconf_dump_parser.set_defaults(which="qconf_dump")
0424 qconf_dump_parser.add_argument("-J", "--json", dest="json", action="store_true", help="Dump configuration in JSON format")
0425 qconf_dump_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Dump configuration of all active queues")
0426 qconf_dump_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
0427 qconf_dump_parser.add_argument(
0428 "-i", "--id", dest="id_list", nargs="+", type=int, action="store", metavar="<configID>", help="Dump configuration of queue with configID"
0429 )
0430
0431 qconf_refresh_parser = qconf_subparsers.add_parser("refresh", help="refresh queue configuration immediately")
0432 qconf_refresh_parser.set_defaults(which="qconf_refresh")
0433 qconf_refresh_parser.add_argument("-R", "--refill", dest="refill", action="store_true", help="Refill pq_table before refresh (cleaner)")
0434
0435 qconf_purge_parser = qconf_subparsers.add_parser("purge", help="Purge the queue thoroughly from harvester DB (Be careful !!)")
0436 qconf_purge_parser.set_defaults(which="qconf_purge")
0437 qconf_purge_parser.add_argument("queue", type=str, action="store", metavar="<queue_name>", help="Name of panda queue to purge")
0438
0439
0440 kill_parser = subparsers.add_parser("kill", help="kill something alive")
0441 kill_subparsers = kill_parser.add_subparsers()
0442
0443 kill_workers_parser = kill_subparsers.add_parser("workers", help="Kill active workers by query")
0444 kill_workers_parser.set_defaults(which="kill_workers")
0445 kill_workers_parser.add_argument(
0446 "--status",
0447 nargs="+",
0448 dest="status",
0449 action="store",
0450 metavar="<status>",
0451 default=["submitted"],
0452 help='worker status (only "submitted", "idle", "running" are valid)',
0453 )
0454 kill_workers_parser.add_argument(
0455 "--sites", nargs="+", dest="sites", action="store", metavar="<site>", required=True, help='site (computingSite); "ALL" for all sites'
0456 )
0457 kill_workers_parser.add_argument(
0458 "--ces", nargs="+", dest="ces", action="store", metavar="<ce>", required=True, help='CE (computingElement); "ALL" for all CEs'
0459 )
0460 kill_workers_parser.add_argument(
0461 "--submissionhosts",
0462 nargs="+",
0463 dest="submissionhosts",
0464 action="store",
0465 metavar="<submission_host>",
0466 required=True,
0467 help='submission host (submissionHost); "ALL" for all submission hosts',
0468 )
0469
0470
0471 query_parser = subparsers.add_parser("query", help="query current status about harvester")
0472 query_subparsers = query_parser.add_subparsers()
0473
0474 query_workers_parser = query_subparsers.add_parser("workers", help="Query statistiscs of workers in queues")
0475 query_workers_parser.set_defaults(which="query_workers")
0476 query_workers_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues")
0477 query_workers_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
0478
0479 query_jobs_parser = query_subparsers.add_parser("jobs", help="Query statistiscs of jobs in queues")
0480 query_jobs_parser.set_defaults(which="query_jobs")
0481 query_jobs_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues")
0482 query_jobs_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
0483
0484
0485 if len(sys.argv) == 1:
0486 oparser.print_help()
0487 sys.exit(1)
0488 arguments = oparser.parse_args(sys.argv[1:])
0489
0490 if arguments.debug:
0491
0492 mainLogger.setLevel(logging.DEBUG)
0493 else:
0494 mainLogger.setLevel(logging.WARNING)
0495
0496 try:
0497 command = commandMap.get(arguments.which)
0498 except AttributeError:
0499 oparser.print_help()
0500 sys.exit(1)
0501 try:
0502 start_time = time.time()
0503 result = command(arguments)
0504 end_time = time.time()
0505 if arguments.debug:
0506 mainLogger.debug(f"ARGS: {arguments} ; RESULT: {result} ")
0507 mainLogger.debug(f"Action completed in {end_time - start_time:.3f} seconds")
0508 sys.exit(result)
0509 except (RuntimeError, NotImplementedError) as e:
0510 mainLogger.critical(f"ERROR: {e}")
0511 sys.exit(1)
0512
0513
0514 if __name__ == "__main__":
0515 try:
0516 main()
0517 except KeyboardInterrupt:
0518 mainLogger.critical("Command Interrupted !!")
0519 sys.exit(1)