File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import traceback
0012
0013 from idds.common.constants import Sections, RequestStatus
0014 from idds.common.utils import setup_logging
0015 from idds.core import (requests as core_requests,
0016 messages as core_messages)
0017 from idds.agents.common.baseagent import BaseAgent
0018
0019
0020 setup_logging(__name__)
0021
0022
0023 class Archiver(BaseAgent):
0024 """
0025 Archiver works to archive data
0026 """
0027
0028 def __init__(self, num_threads=1, poll_period=7, older_than=30, **kwargs):
0029 self.set_max_workers()
0030 num_threads = self.max_number_workers
0031 super(Archiver, self).__init__(num_threads=num_threads, name='Archive', **kwargs)
0032 if not poll_period:
0033 poll_period = 7
0034 self.poll_period = int(poll_period) * 3600 * 24
0035 if not older_than:
0036 older_than = 30
0037 self.older_than = int(older_than)
0038 self.config_section = Sections.Archiver
0039
0040 def clean_messages(self):
0041 try:
0042 status = [RequestStatus.Finished, RequestStatus.SubFinished,
0043 RequestStatus.Failed, RequestStatus.Cancelled,
0044 RequestStatus.Suspended, RequestStatus.Expired]
0045 request_id = core_requests.get_last_request_id(older_than=self.older_than, status=status)
0046 if request_id:
0047 self.logger.info("cleaning old mesages older than request id %s" % request_id)
0048 core_messages.clean_old_messages(request_id=request_id)
0049 except Exception as ex:
0050 self.logger.error(ex)
0051 self.logger.error(traceback.format_exc())
0052
0053 def run(self):
0054 """
0055 Main run function.
0056 """
0057 try:
0058 self.logger.info("Starting main thread")
0059 self.init_thread_info()
0060
0061 self.add_default_tasks()
0062
0063 self.logger.info("poll period: %s seconds" % self.poll_period)
0064 self.logger.info("older_than: %s days" % self.older_than)
0065
0066 task = self.create_task(task_func=self.clean_messages, task_output_queue=None,
0067 task_args=tuple(), task_kwargs={}, delay_time=self.poll_period, priority=1)
0068 self.add_task(task)
0069
0070 self.execute()
0071 except KeyboardInterrupt:
0072 self.stop()
0073
0074
0075 if __name__ == '__main__':
0076 agent = Archiver()
0077 agent()