Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 import sys
0002 import time
0003 import traceback
0004 
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.thread_utils import GenericThread
0007 from pandaserver.config import panda_config
0008 from pandaserver.taskbuffer.Utils import create_shards
0009 
0010 # logger
0011 _logger = PandaLogger().getLogger("worker_sync")
0012 
0013 
0014 def translate_status_to_command(pilot_status):
0015     if pilot_status == "running":
0016         return "SYNC_WORKERS_ACTIVATE"
0017     if pilot_status == "finished":
0018         return "SYNC_WORKERS_KILL"
0019     return None
0020 
0021 
0022 class WorkerSync(object):
0023     def __init__(self, tbuf):
0024         self._logger = _logger
0025         self.tbuf = tbuf
0026         return
0027 
0028     def run(self):
0029         """
0030         Identifies workers with stale harvester states and newer pilot states
0031         :return:
0032         """
0033 
0034         # timing
0035         time_start = time.time()
0036         self._logger.debug("Start.")
0037 
0038         # variables for the harvester command
0039         status = "new"
0040         ack_requested = False
0041         lock_interval = None
0042         com_interval = None
0043 
0044         try:
0045             stale_workers_per_harvester = self.tbuf.get_workers_to_synchronize()
0046             for harvester_id in stale_workers_per_harvester:
0047                 for pilot_status in stale_workers_per_harvester[harvester_id]:
0048                     command = translate_status_to_command(pilot_status)
0049                     if command:
0050                         workers = stale_workers_per_harvester[harvester_id][pilot_status]
0051                         for worker_shard in create_shards(workers, 100):
0052                             self._logger.debug(f"Processing harvester_id={harvester_id} pilot_status={pilot_status}. Workers to update: {worker_shard}")
0053                             self.tbuf.commandToHarvester(
0054                                 harvester_id,
0055                                 command,
0056                                 ack_requested,
0057                                 status,
0058                                 lock_interval,
0059                                 com_interval,
0060                                 worker_shard,
0061                             )
0062         except Exception:
0063             self._logger.error(traceback.format_exc())
0064 
0065         # timing
0066         time_stop = time.time()
0067         self._logger.debug(f"Done. Worker sync took: {time_stop - time_start} s")
0068 
0069         return
0070 
0071 
0072 # main
0073 def main(tbuf=None, **kwargs):
0074     # instantiate TB
0075     if tbuf is None:
0076         from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0077 
0078         requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0079         taskBuffer.init(
0080             panda_config.dbhost,
0081             panda_config.dbpasswd,
0082             nDBConnection=1,
0083             useTimeout=True,
0084             requester=requester_id,
0085         )
0086     else:
0087         taskBuffer = tbuf
0088     # run
0089     WorkerSync(tbuf=taskBuffer).run()
0090     # stop taskBuffer if created inside this script
0091     if tbuf is None:
0092         taskBuffer.cleanup(requester=requester_id)
0093 
0094 
0095 # run
0096 if __name__ == "__main__":
0097     main()