Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import sys
0002 import time
0003 import traceback
0004 
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.thread_utils import GenericThread
0007 
0008 from pandaserver.config import panda_config
0009 
0010 _logger = PandaLogger().getLogger("pilot_streaming")
0011 
0012 
0013 class PilotStreaming(object):
0014     def __init__(self, tbuf):
0015         self._logger = _logger
0016         self.tbuf = tbuf
0017         return
0018 
0019     def run(self):
0020         """
0021         Gets and iterates over ups queues, deciding the job requirements and sending these to Harvester
0022         via the command interface
0023         :return:
0024         """
0025 
0026         # timing
0027         time_start = time.time()
0028         self._logger.debug("Start.")
0029 
0030         # get unified pilot streaming (ups) queues
0031         ups_queues = self.tbuf.ups_get_queues()
0032         self._logger.debug(f"UPS queues: {ups_queues}")
0033 
0034         # load the worker stats from the database
0035         worker_stats = self.tbuf.ups_load_worker_stats()
0036 
0037         # iterate over the UPS queues
0038         for ups_queue in ups_queues:
0039             # separate the worker and job stats for the queue
0040             try:
0041                 tmp_worker_stats = worker_stats[ups_queue]
0042                 self._logger.debug(f"worker_stats for queue {ups_queue}: {tmp_worker_stats}")
0043             except KeyError:
0044                 # skip queue if no data available
0045                 self._logger.debug(f"No worker stats for queue {ups_queue}")
0046                 continue
0047 
0048             # calculate the new worker distribution and save the harvester commands in the database
0049             try:
0050                 new_workers_per_harvester = self.tbuf.ups_new_worker_distribution(ups_queue, tmp_worker_stats)
0051                 self._logger.info(f"queue: {ups_queue}, results: {new_workers_per_harvester}")
0052 
0053                 # variables for the harvester command
0054                 command = f"SET_N_WORKERS_JOBTYPE:{ups_queue}"
0055                 status = "new"
0056                 ack_requested = False
0057                 lock_interval = None
0058                 com_interval = None
0059 
0060                 for harvester_id in new_workers_per_harvester:
0061                     params = new_workers_per_harvester[harvester_id]
0062                     self.tbuf.commandToHarvester(
0063                         harvester_id,
0064                         command,
0065                         ack_requested,
0066                         status,
0067                         lock_interval,
0068                         com_interval,
0069                         params,
0070                     )
0071             except Exception:
0072                 self._logger.error(traceback.format_exc())
0073 
0074         # log the timing
0075         time_stop = time.time()
0076         self._logger.debug(f"Done. Pilot streaming took: {time_stop - time_start} s")
0077 
0078         return
0079 
0080 
0081 # main
0082 def main(tbuf=None, **kwargs):
0083     requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0084 
0085     # instantiate TB
0086     if tbuf is None:
0087         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0088 
0089         taskBuffer.init(
0090             panda_config.dbhost,
0091             panda_config.dbpasswd,
0092             nDBConnection=1,
0093             useTimeout=True,
0094             requester=requester_id,
0095         )
0096     else:
0097         taskBuffer = tbuf
0098 
0099     # run the pilot streaming logic
0100     PilotStreaming(tbuf=taskBuffer).run()
0101 
0102     # stop the taskBuffer if it was created inside this script
0103     if tbuf is None:
0104         taskBuffer.cleanup(requester=requester_id)
0105 
0106 
0107 # run
0108 if __name__ == "__main__":
0109     main()