Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import argparse
0002 import json
0003 import logging
0004 import random
0005 import sys
0006 import threading
0007 import time
0008 from concurrent.futures import ThreadPoolExecutor
0009 
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 from pandaharvester.harvestercore import fifos as harvesterFifos
0013 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0014 from pandaharvester.harvestercore.work_spec import WorkSpec
0015 from pandaharvester.harvestermisc.selfcheck import harvesterPackageInfo
0016 
0017 # === Logger ===================================================
0018 
0019 
0020 def setupLogger(logger):
0021     logger.setLevel(logging.DEBUG)
0022     hdlr = logging.StreamHandler()
0023 
0024     def emit_decorator(fn):
0025         def func(*args):
0026             levelno = args[0].levelno
0027             if levelno >= logging.CRITICAL:
0028                 color = "\033[35;1m"
0029             elif levelno >= logging.ERROR:
0030                 color = "\033[31;1m"
0031             elif levelno >= logging.WARNING:
0032                 color = "\033[33;1m"
0033             elif levelno >= logging.INFO:
0034                 color = "\033[32;1m"
0035             elif levelno >= logging.DEBUG:
0036                 color = "\033[36;1m"
0037             else:
0038                 color = "\033[0m"
0039             formatter = logging.Formatter(f"{color}[%(asctime)s %(levelname)s] %(message)s\x1b[0m")
0040             hdlr.setFormatter(formatter)
0041             return fn(*args)
0042 
0043         return func
0044 
0045     hdlr.emit = emit_decorator(hdlr.emit)
0046     logger.addHandler(hdlr)
0047 
0048 
0049 mainLogger = logging.getLogger("HarvesterAdminTool")
0050 setupLogger(mainLogger)
0051 
0052 # === Operation functions ========================================================
0053 
0054 
0055 def json_print(data):
0056     print(json.dumps(data, sort_keys=True, indent=4))
0057 
0058 
0059 def multithread_executer(func, n_objects, n_threads, initializer=None, initargs=()):
0060     with ThreadPoolExecutor(n_threads, initializer=initializer, initargs=initargs) as _pool:
0061         retIterator = _pool.map(func, range(n_objects))
0062     return retIterator
0063 
0064 
0065 def get_harvester_attributes():
0066     attr_list = [
0067         "harvesterID",
0068         "version",
0069         "commit_info",
0070         "harvester_config",
0071     ]
0072     return attr_list
0073 
0074 
0075 def repopulate_fifos(*names):
0076     fifo_class_name_map = {
0077         "monitor": "MonitorFIFO",
0078     }
0079     if len(names) > 0:
0080         fifo_class_name_list = [fifo_class_name_map.get(name) for name in names]
0081     else:
0082         fifo_class_name_list = fifo_class_name_map.values()
0083     for fifo_class_name in fifo_class_name_list:
0084         if fifo_class_name is None:
0085             continue
0086         fifo = getattr(harvesterFifos, fifo_class_name)()
0087         if not fifo.enabled:
0088             continue
0089         fifo.populate(clear_fifo=True)
0090         print(f"Repopulated {fifo.titleName} fifo")
0091 
0092 
0093 # TODO
0094 
0095 # === Command functions ========================================================
0096 
0097 
0098 def test(arguments):
0099     mainLogger.critical("Harvester Admin Tool: test CRITICAL")
0100     mainLogger.error("Harvester Admin Tool: test ERROR")
0101     mainLogger.warning("Harvester Admin Tool: test WARNING")
0102     mainLogger.info("Harvester Admin Tool: test INFO")
0103     mainLogger.debug("Harvester Admin Tool: test DEBUG")
0104     print("Harvester Admin Tool: test")
0105 
0106 
0107 def get(arguments):
0108     attr = arguments.attribute
0109     hpi = harvesterPackageInfo(None)
0110     if attr not in get_harvester_attributes():
0111         mainLogger.error(f"Invalid attribute: {attr}")
0112         return
0113     elif attr == "version":
0114         print(hpi.version)
0115     elif attr == "commit_info":
0116         print(hpi.commit_info)
0117     elif attr == "harvesterID":
0118         print(harvester_config.master.harvester_id)
0119     elif attr == "harvester_config":
0120         json_print(harvester_config.config_dict)
0121 
0122 
0123 def fifo_benchmark(arguments):
0124     n_objects = arguments.n_objects
0125     n_threads = arguments.n_threads
0126     sum_dict = {
0127         "put_n": 0,
0128         "put_time": 0.0,
0129         "get_time": 0.0,
0130         "get_protective_time": 0.0,
0131         "clear_time": 0.0,
0132     }
0133     thread_lock = threading.Lock()
0134     thread_fifo_map = {}
0135 
0136     def _thread_initializer():
0137         thread_id = threading.get_ident()
0138         with thread_lock:
0139             thread_fifo_map[thread_id] = harvesterFifos.BenchmarkFIFO()
0140 
0141     def _put_object(i_index):
0142         mq = thread_fifo_map.get(threading.get_ident())
0143         if mq is None:
0144             return
0145         workspec = WorkSpec()
0146         workspec.workerID = i_index
0147         data = {"random": [(i_index**2) % 2**16, random.random()]}
0148         workspec.workAttributes = data
0149         mq.put(workspec)
0150 
0151     def _get_object(i_index):
0152         mq = thread_fifo_map.get(threading.get_ident())
0153         if mq is None:
0154             return
0155         return mq.get(timeout=3, protective=False)
0156 
0157     def _get_object_protective(i_index):
0158         mq = thread_fifo_map.get(threading.get_ident())
0159         if mq is None:
0160             return
0161         return mq.get(timeout=3, protective=True)
0162 
0163     def put_test():
0164         sw = core_utils.get_stopwatch()
0165         sw.reset()
0166         multithread_executer(_put_object, n_objects, n_threads, _thread_initializer)
0167         sum_dict["put_time"] += sw.get_elapsed_time_in_sec()
0168         sum_dict["put_n"] += 1
0169         print(f"Put {n_objects} objects by {n_threads} threads" + sw.get_elapsed_time())
0170         benchmark_mq = harvesterFifos.BenchmarkFIFO()
0171         print(f"Now fifo size is {benchmark_mq.size()}")
0172         del benchmark_mq
0173 
0174     def get_test():
0175         sw = core_utils.get_stopwatch()
0176         sw.reset()
0177         multithread_executer(_get_object, n_objects, n_threads, _thread_initializer)
0178         sum_dict["get_time"] = sw.get_elapsed_time_in_sec()
0179         print(f"Get {n_objects} objects by {n_threads} threads" + sw.get_elapsed_time())
0180         benchmark_mq = harvesterFifos.BenchmarkFIFO()
0181         print(f"Now fifo size is {benchmark_mq.size()}")
0182         del benchmark_mq
0183 
0184     def get_protective_test():
0185         sw = core_utils.get_stopwatch()
0186         sw.reset()
0187         multithread_executer(_get_object_protective, n_objects, n_threads, _thread_initializer)
0188         sum_dict["get_protective_time"] = sw.get_elapsed_time_in_sec()
0189         print(f"Get {n_objects} objects protective dequeue by {n_threads} threads" + sw.get_elapsed_time())
0190         benchmark_mq = harvesterFifos.BenchmarkFIFO()
0191         print(f"Now fifo size is {benchmark_mq.size()}")
0192         del benchmark_mq
0193 
0194     def clear_test():
0195         benchmark_mq = harvesterFifos.BenchmarkFIFO()
0196         sw = core_utils.get_stopwatch()
0197         sw.reset()
0198         benchmark_mq.fifo.clear()
0199         sum_dict["clear_time"] = sw.get_elapsed_time_in_sec()
0200         print("Cleared fifo" + sw.get_elapsed_time())
0201         print(f"Now fifo size is {benchmark_mq.size()}")
0202         del benchmark_mq
0203 
0204     # Benchmark
0205     print("Start fifo benchmark ...")
0206     benchmark_mq = harvesterFifos.BenchmarkFIFO()
0207     benchmark_mq.fifo.clear()
0208     print("Cleared fifo")
0209     put_test()
0210     get_test()
0211     put_test()
0212     get_protective_test()
0213     put_test()
0214     clear_test()
0215     print("Finished fifo benchmark")
0216     # summary
0217     print("Summary:")
0218     print(f"FIFO plugin is: {benchmark_mq.fifo.__class__.__name__}")
0219     print(f"Benchmark with {n_objects} objects by {n_threads} threads")
0220     print(f"Put            : {1000.0 * sum_dict['put_time'] / (sum_dict['put_n'] * n_objects):.3f} ms / obj")
0221     print(f"Get            : {1000.0 * sum_dict['get_time'] / n_objects:.3f} ms / obj")
0222     print(f"Get protective : {1000.0 * sum_dict['get_protective_time'] / n_objects:.3f} ms / obj")
0223     print(f"Clear          : {1000.0 * sum_dict['clear_time'] / n_objects:.3f} ms / obj")
0224 
0225 
0226 def fifo_repopulate(arguments):
0227     if "ALL" in arguments.name_list:
0228         repopulate_fifos()
0229     else:
0230         repopulate_fifos(*arguments.name_list)
0231 
0232 
0233 def cacher_refresh(arguments):
0234     from pandaharvester.harvesterbody.cacher import Cacher
0235     from pandaharvester.harvestercore.communicator_pool import CommunicatorPool
0236 
0237     communicatorPool = CommunicatorPool()
0238     cacher = Cacher(communicatorPool)
0239     cacher.execute(force_update=True, skip_lock=True, n_threads=4)
0240 
0241 
0242 def qconf_list(arguments):
0243     from pandaharvester.harvesterscripts import queue_config_tool
0244 
0245     if arguments.all:
0246         queue_config_tool.list_config_ids()
0247     else:
0248         queue_config_tool.list_active_queues()
0249 
0250 
0251 def qconf_refresh(arguments):
0252     from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0253 
0254     qcm = QueueConfigMapper()
0255     qcm._update_last_reload_time()
0256     qcm.lastUpdate = None
0257     qcm.load_data(refill_table=arguments.refill)
0258 
0259 
0260 def qconf_dump(arguments):
0261     from pandaharvester.harvesterscripts import queue_config_tool
0262 
0263     to_print = not arguments.json
0264     try:
0265         if arguments.id_list:
0266             res_list = [vars(queue_config_tool.dump_queue_with_config_id(configID, to_print)) for configID in arguments.id_list]
0267             resObj = {obj.get("queueName"): obj for obj in res_list}
0268         elif arguments.all:
0269             res_list = queue_config_tool.dump_all_active_queues(to_print)
0270             if res_list is None or to_print:
0271                 resObj = {}
0272             else:
0273                 resObj = {vars(qm).get("queueName", ""): vars(qm) for qm in res_list}
0274         else:
0275             resObj = {queue: vars(queue_config_tool.dump_active_queue(queue, to_print)) for queue in arguments.queue_list}
0276     except TypeError as e:
0277         if str(e) == "vars() argument must have __dict__ attribute":
0278             resObj = {}
0279         else:
0280             raise
0281     if arguments.json:
0282         json_print(resObj)
0283 
0284 
0285 def qconf_purge(arguments):
0286     queueName = arguments.queue
0287     dbProxy = DBProxy()
0288     retVal = dbProxy.purge_pq(queueName)
0289     if retVal:
0290         print(f"Purged {queueName} from harvester DB")
0291     else:
0292         mainLogger.critical(f"Failed to purge {queueName} . See panda-db_proxy.log")
0293 
0294 
0295 def kill_workers(arguments):
0296     status_in = "ALL" if (len(arguments.status) == 1 and arguments.status[0] == "ALL") else arguments.status
0297     computingSite_in = "ALL" if (len(arguments.sites) == 1 and arguments.sites[0] == "ALL") else arguments.sites
0298     computingElement_in = "ALL" if (len(arguments.ces) == 1 and arguments.ces[0] == "ALL") else arguments.ces
0299     submissionHost_in = "ALL" if (len(arguments.submissionhosts) == 1 and arguments.submissionhosts[0] == "ALL") else arguments.submissionhosts
0300     dbProxy = DBProxy()
0301     retVal = dbProxy.mark_workers_to_kill_by_query(
0302         {"status": status_in, "computingSite": computingSite_in, "computingElement": computingElement_in, "submissionHost": submissionHost_in}
0303     )
0304     if retVal is not None:
0305         msg_temp = (
0306             "Sweeper will soon kill {n_workers} workers, with "
0307             "status in {status_in}, "
0308             "computingSite in {computingSite_in}, "
0309             "computingElement in {computingElement_in}, "
0310             "submissionHost in {submissionHost_in}"
0311         )
0312         print(
0313             msg_temp.format(
0314                 n_workers=retVal,
0315                 status_in=status_in,
0316                 computingSite_in=computingSite_in,
0317                 computingElement_in=computingElement_in,
0318                 submissionHost_in=submissionHost_in,
0319             )
0320         )
0321     else:
0322         mainLogger.critical("Failed to kill workers. See panda-db_proxy.log")
0323 
0324 
0325 def query_workers(arguments):
0326     dbProxy = DBProxy()
0327     try:
0328         if arguments.all:
0329             res_obj = dbProxy.get_worker_stats_full()
0330         else:
0331             res_obj = dbProxy.get_worker_stats_full(filter_site_list=arguments.queue_list)
0332         json_print(res_obj)
0333     except TypeError as e:
0334         raise
0335 
0336 
0337 def query_jobs(arguments):
0338     dbProxy = DBProxy()
0339     try:
0340         if arguments.all:
0341             res_obj = dbProxy.get_job_stats_full()
0342         else:
0343             res_obj = dbProxy.get_job_stats_full(filter_site_list=arguments.queue_list)
0344         json_print(res_obj)
0345     except TypeError as e:
0346         raise
0347 
0348 
0349 # === Command map =======================================================
0350 
0351 
0352 commandMap = {
0353     # test commands
0354     "test": test,
0355     # get commands
0356     "get": get,
0357     # fifo commands
0358     "fifo_benchmark": fifo_benchmark,
0359     "fifo_repopulate": fifo_repopulate,
0360     # cacher commands
0361     "cacher_refresh": cacher_refresh,
0362     # qconf commands
0363     "qconf_list": qconf_list,
0364     "qconf_dump": qconf_dump,
0365     "qconf_refresh": qconf_refresh,
0366     "qconf_purge": qconf_purge,
0367     # kill commands
0368     "kill_workers": kill_workers,
0369     # query commands
0370     "query_workers": query_workers,
0371     "query_jobs": query_jobs,
0372 }
0373 
0374 # === Main ======================================================
0375 
0376 
0377 def main():
0378     # main parser
0379     oparser = argparse.ArgumentParser(prog="harvester-admin", add_help=True)
0380     subparsers = oparser.add_subparsers()
0381     oparser.add_argument("-v", "--verbose", "--debug", action="store_true", dest="debug", help="Print more verbose output. (Debug mode !)")
0382 
0383     # test command
0384     test_parser = subparsers.add_parser("test", help="for testing only")
0385     test_parser.set_defaults(which="test")
0386 
0387     # get command
0388     get_parser = subparsers.add_parser("get", help="get attributes of this harvester")
0389     get_parser.set_defaults(which="get")
0390     get_parser.add_argument("attribute", type=str, action="store", metavar="<attribute>", choices=get_harvester_attributes(), help="attribute")
0391 
0392     # fifo parser
0393     fifo_parser = subparsers.add_parser("fifo", help="fifo related")
0394     fifo_subparsers = fifo_parser.add_subparsers()
0395     # fifo benchmark command
0396     fifo_benchmark_parser = fifo_subparsers.add_parser("benchmark", help="benchmark fifo backend")
0397     fifo_benchmark_parser.set_defaults(which="fifo_benchmark")
0398     fifo_benchmark_parser.add_argument("-n", type=int, dest="n_objects", action="store", default=500, metavar="<N>", help="Benchmark with N objects")
0399     fifo_benchmark_parser.add_argument("-t", type=int, dest="n_threads", action="store", default=1, metavar="<N>", help="Benchmark with N threads")
0400     # fifo repopuate command
0401     fifo_repopulate_parser = fifo_subparsers.add_parser("repopulate", help="Repopulate agent fifo")
0402     fifo_repopulate_parser.set_defaults(which="fifo_repopulate")
0403     fifo_repopulate_parser.add_argument(
0404         "name_list", nargs="+", type=str, action="store", metavar="<agent_name>", help='Name of agent fifo, e.g. "monitor" ("ALL" for all)'
0405     )
0406 
0407     # cacher parser
0408     cacher_parser = subparsers.add_parser("cacher", help="cacher related")
0409     cacher_subparsers = cacher_parser.add_subparsers()
0410     # cacher refresh command
0411     cacher_refresh_parser = cacher_subparsers.add_parser("refresh", help="refresh cacher immediately")
0412     cacher_refresh_parser.set_defaults(which="cacher_refresh")
0413 
0414     # qconf (queue configuration) parser
0415     qconf_parser = subparsers.add_parser("qconf", help="queue configuration")
0416     qconf_subparsers = qconf_parser.add_subparsers()
0417     # qconf list command
0418     qconf_list_parser = qconf_subparsers.add_parser("list", help="List queues. Only active queues listed by default")
0419     qconf_list_parser.set_defaults(which="qconf_list")
0420     qconf_list_parser.add_argument("-a", "--all", dest="all", action="store_true", help="List name and configID of all queues")
0421     # qconf dump command
0422     qconf_dump_parser = qconf_subparsers.add_parser("dump", help="Dump queue configurations")
0423     qconf_dump_parser.set_defaults(which="qconf_dump")
0424     qconf_dump_parser.add_argument("-J", "--json", dest="json", action="store_true", help="Dump configuration in JSON format")
0425     qconf_dump_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Dump configuration of all active queues")
0426     qconf_dump_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
0427     qconf_dump_parser.add_argument(
0428         "-i", "--id", dest="id_list", nargs="+", type=int, action="store", metavar="<configID>", help="Dump configuration of queue with configID"
0429     )
0430     # qconf refresh command
0431     qconf_refresh_parser = qconf_subparsers.add_parser("refresh", help="refresh queue configuration immediately")
0432     qconf_refresh_parser.set_defaults(which="qconf_refresh")
0433     qconf_refresh_parser.add_argument("-R", "--refill", dest="refill", action="store_true", help="Refill pq_table before refresh (cleaner)")
0434     # qconf purge command
0435     qconf_purge_parser = qconf_subparsers.add_parser("purge", help="Purge the queue thoroughly from harvester DB (Be careful !!)")
0436     qconf_purge_parser.set_defaults(which="qconf_purge")
0437     qconf_purge_parser.add_argument("queue", type=str, action="store", metavar="<queue_name>", help="Name of panda queue to purge")
0438 
0439     # kill parser
0440     kill_parser = subparsers.add_parser("kill", help="kill something alive")
0441     kill_subparsers = kill_parser.add_subparsers()
0442     # kill workers command
0443     kill_workers_parser = kill_subparsers.add_parser("workers", help="Kill active workers by query")
0444     kill_workers_parser.set_defaults(which="kill_workers")
0445     kill_workers_parser.add_argument(
0446         "--status",
0447         nargs="+",
0448         dest="status",
0449         action="store",
0450         metavar="<status>",
0451         default=["submitted"],
0452         help='worker status (only "submitted", "idle", "running" are valid)',
0453     )
0454     kill_workers_parser.add_argument(
0455         "--sites", nargs="+", dest="sites", action="store", metavar="<site>", required=True, help='site (computingSite); "ALL" for all sites'
0456     )
0457     kill_workers_parser.add_argument(
0458         "--ces", nargs="+", dest="ces", action="store", metavar="<ce>", required=True, help='CE (computingElement); "ALL" for all CEs'
0459     )
0460     kill_workers_parser.add_argument(
0461         "--submissionhosts",
0462         nargs="+",
0463         dest="submissionhosts",
0464         action="store",
0465         metavar="<submission_host>",
0466         required=True,
0467         help='submission host (submissionHost); "ALL" for all submission hosts',
0468     )
0469 
0470     # query parser
0471     query_parser = subparsers.add_parser("query", help="query current status about harvester")
0472     query_subparsers = query_parser.add_subparsers()
0473     # query worker_stats command
0474     query_workers_parser = query_subparsers.add_parser("workers", help="Query statistiscs of workers in queues")
0475     query_workers_parser.set_defaults(which="query_workers")
0476     query_workers_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues")
0477     query_workers_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
0478     # query job_stats command
0479     query_jobs_parser = query_subparsers.add_parser("jobs", help="Query statistiscs of jobs in queues")
0480     query_jobs_parser.set_defaults(which="query_jobs")
0481     query_jobs_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues")
0482     query_jobs_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
0483 
0484     # start parsing
0485     if len(sys.argv) == 1:
0486         oparser.print_help()
0487         sys.exit(1)
0488     arguments = oparser.parse_args(sys.argv[1:])
0489     # log level
0490     if arguments.debug:
0491         # Debug mode of logger
0492         mainLogger.setLevel(logging.DEBUG)
0493     else:
0494         mainLogger.setLevel(logging.WARNING)
0495     # Run command functions
0496     try:
0497         command = commandMap.get(arguments.which)
0498     except AttributeError:
0499         oparser.print_help()
0500         sys.exit(1)
0501     try:
0502         start_time = time.time()
0503         result = command(arguments)
0504         end_time = time.time()
0505         if arguments.debug:
0506             mainLogger.debug(f"ARGS: {arguments} ; RESULT: {result} ")
0507             mainLogger.debug(f"Action completed in {end_time - start_time:.3f} seconds")
0508         sys.exit(result)
0509     except (RuntimeError, NotImplementedError) as e:
0510         mainLogger.critical(f"ERROR: {e}")
0511         sys.exit(1)
0512 
0513 
0514 if __name__ == "__main__":
0515     try:
0516         main()
0517     except KeyboardInterrupt:
0518         mainLogger.critical("Command Interrupted !!")
0519         sys.exit(1)