File indexing completed on 2026-04-10 08:39:01
0001 import sys
0002 import time
0003 import traceback
0004
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.thread_utils import GenericThread
0007
0008 from pandaserver.config import panda_config
0009
0010 _logger = PandaLogger().getLogger("pilot_streaming")
0011
0012
0013 class PilotStreaming(object):
0014 def __init__(self, tbuf):
0015 self._logger = _logger
0016 self.tbuf = tbuf
0017 return
0018
0019 def run(self):
0020 """
0021 Gets and iterates over ups queues, deciding the job requirements and sending these to Harvester
0022 via the command interface
0023 :return:
0024 """
0025
0026
0027 time_start = time.time()
0028 self._logger.debug("Start.")
0029
0030
0031 ups_queues = self.tbuf.ups_get_queues()
0032 self._logger.debug(f"UPS queues: {ups_queues}")
0033
0034
0035 worker_stats = self.tbuf.ups_load_worker_stats()
0036
0037
0038 for ups_queue in ups_queues:
0039
0040 try:
0041 tmp_worker_stats = worker_stats[ups_queue]
0042 self._logger.debug(f"worker_stats for queue {ups_queue}: {tmp_worker_stats}")
0043 except KeyError:
0044
0045 self._logger.debug(f"No worker stats for queue {ups_queue}")
0046 continue
0047
0048
0049 try:
0050 new_workers_per_harvester = self.tbuf.ups_new_worker_distribution(ups_queue, tmp_worker_stats)
0051 self._logger.info(f"queue: {ups_queue}, results: {new_workers_per_harvester}")
0052
0053
0054 command = f"SET_N_WORKERS_JOBTYPE:{ups_queue}"
0055 status = "new"
0056 ack_requested = False
0057 lock_interval = None
0058 com_interval = None
0059
0060 for harvester_id in new_workers_per_harvester:
0061 params = new_workers_per_harvester[harvester_id]
0062 self.tbuf.commandToHarvester(
0063 harvester_id,
0064 command,
0065 ack_requested,
0066 status,
0067 lock_interval,
0068 com_interval,
0069 params,
0070 )
0071 except Exception:
0072 self._logger.error(traceback.format_exc())
0073
0074
0075 time_stop = time.time()
0076 self._logger.debug(f"Done. Pilot streaming took: {time_stop - time_start} s")
0077
0078 return
0079
0080
0081
0082 def main(tbuf=None, **kwargs):
0083 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0084
0085
0086 if tbuf is None:
0087 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0088
0089 taskBuffer.init(
0090 panda_config.dbhost,
0091 panda_config.dbpasswd,
0092 nDBConnection=1,
0093 useTimeout=True,
0094 requester=requester_id,
0095 )
0096 else:
0097 taskBuffer = tbuf
0098
0099
0100 PilotStreaming(tbuf=taskBuffer).run()
0101
0102
0103 if tbuf is None:
0104 taskBuffer.cleanup(requester=requester_id)
0105
0106
0107
0108 if __name__ == "__main__":
0109 main()