File indexing completed on 2026-04-10 08:39:01
0001 import sys
0002 import time
0003 import traceback
0004
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.thread_utils import GenericThread
0007 from pandaserver.config import panda_config
0008 from pandaserver.taskbuffer.Utils import create_shards
0009
0010
0011 _logger = PandaLogger().getLogger("worker_sync")
0012
0013
0014 def translate_status_to_command(pilot_status):
0015 if pilot_status == "running":
0016 return "SYNC_WORKERS_ACTIVATE"
0017 if pilot_status == "finished":
0018 return "SYNC_WORKERS_KILL"
0019 return None
0020
0021
0022 class WorkerSync(object):
0023 def __init__(self, tbuf):
0024 self._logger = _logger
0025 self.tbuf = tbuf
0026 return
0027
0028 def run(self):
0029 """
0030 Identifies workers with stale harvester states and newer pilot states
0031 :return:
0032 """
0033
0034
0035 time_start = time.time()
0036 self._logger.debug("Start.")
0037
0038
0039 status = "new"
0040 ack_requested = False
0041 lock_interval = None
0042 com_interval = None
0043
0044 try:
0045 stale_workers_per_harvester = self.tbuf.get_workers_to_synchronize()
0046 for harvester_id in stale_workers_per_harvester:
0047 for pilot_status in stale_workers_per_harvester[harvester_id]:
0048 command = translate_status_to_command(pilot_status)
0049 if command:
0050 workers = stale_workers_per_harvester[harvester_id][pilot_status]
0051 for worker_shard in create_shards(workers, 100):
0052 self._logger.debug(f"Processing harvester_id={harvester_id} pilot_status={pilot_status}. Workers to update: {worker_shard}")
0053 self.tbuf.commandToHarvester(
0054 harvester_id,
0055 command,
0056 ack_requested,
0057 status,
0058 lock_interval,
0059 com_interval,
0060 worker_shard,
0061 )
0062 except Exception:
0063 self._logger.error(traceback.format_exc())
0064
0065
0066 time_stop = time.time()
0067 self._logger.debug(f"Done. Worker sync took: {time_stop - time_start} s")
0068
0069 return
0070
0071
0072
0073 def main(tbuf=None, **kwargs):
0074
0075 if tbuf is None:
0076 from pandaserver.taskbuffer.TaskBuffer import taskBuffer
0077
0078 requester_id = GenericThread().get_full_id(__name__, sys.modules[__name__].__file__)
0079 taskBuffer.init(
0080 panda_config.dbhost,
0081 panda_config.dbpasswd,
0082 nDBConnection=1,
0083 useTimeout=True,
0084 requester=requester_id,
0085 )
0086 else:
0087 taskBuffer = tbuf
0088
0089 WorkerSync(tbuf=taskBuffer).run()
0090
0091 if tbuf is None:
0092 taskBuffer.cleanup(requester=requester_id)
0093
0094
0095
0096 if __name__ == "__main__":
0097 main()