Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2023
0010 
0011 import traceback
0012 
0013 from idds.common.constants import Sections, RequestStatus
0014 from idds.common.utils import setup_logging
0015 from idds.core import (requests as core_requests,
0016                        messages as core_messages)
0017 from idds.agents.common.baseagent import BaseAgent
0018 
0019 
0020 setup_logging(__name__)
0021 
0022 
0023 class Archiver(BaseAgent):
0024     """
0025     Archiver works to archive data
0026     """
0027 
0028     def __init__(self, num_threads=1, poll_period=7, older_than=30, **kwargs):
0029         self.set_max_workers()
0030         num_threads = self.max_number_workers
0031         super(Archiver, self).__init__(num_threads=num_threads, name='Archive', **kwargs)
0032         if not poll_period:
0033             poll_period = 7            # days
0034         self.poll_period = int(poll_period) * 3600 * 24
0035         if not older_than:
0036             older_than = 30
0037         self.older_than = int(older_than)      # days
0038         self.config_section = Sections.Archiver
0039 
0040     def clean_messages(self):
0041         try:
0042             status = [RequestStatus.Finished, RequestStatus.SubFinished,
0043                       RequestStatus.Failed, RequestStatus.Cancelled,
0044                       RequestStatus.Suspended, RequestStatus.Expired]
0045             request_id = core_requests.get_last_request_id(older_than=self.older_than, status=status)
0046             if request_id:
0047                 self.logger.info("cleaning old mesages older than request id %s" % request_id)
0048                 core_messages.clean_old_messages(request_id=request_id)
0049         except Exception as ex:
0050             self.logger.error(ex)
0051             self.logger.error(traceback.format_exc())
0052 
0053     def run(self):
0054         """
0055         Main run function.
0056         """
0057         try:
0058             self.logger.info("Starting main thread")
0059             self.init_thread_info()
0060 
0061             self.add_default_tasks()
0062 
0063             self.logger.info("poll period: %s seconds" % self.poll_period)
0064             self.logger.info("older_than: %s days" % self.older_than)
0065 
0066             task = self.create_task(task_func=self.clean_messages, task_output_queue=None,
0067                                     task_args=tuple(), task_kwargs={}, delay_time=self.poll_period, priority=1)
0068             self.add_task(task)
0069 
0070             self.execute()
0071         except KeyboardInterrupt:
0072             self.stop()
0073 
0074 
0075 if __name__ == '__main__':
0076     agent = Archiver()
0077     agent()