Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020
0010 
0011 import traceback
0012 try:
0013     # python 3
0014     from queue import Queue
0015 except ImportError:
0016     # Python 2
0017     from Queue import Queue
0018 
0019 
0020 from idds.common.constants import (Sections, WorkprogressStatus, WorkprogressLocking, TransformStatus)
0021 from idds.common.utils import setup_logging
0022 from idds.core import (workprogress as core_workprogress, transforms as core_transforms)
0023 from idds.workflow.work import WorkStatus
0024 from idds.agents.common.baseagent import BaseAgent
0025 
0026 setup_logging(__name__)
0027 
0028 
0029 class Marshaller(BaseAgent):
0030     """
0031     Marshaller works to organize workflow, activate the current works in a workflow to workprogresses..
0032     """
0033 
0034     def __init__(self, num_threads=1, poll_time_period=1800, retrieve_bulk_size=10, **kwargs):
0035         super(Marshaller, self).__init__(num_threads=num_threads, **kwargs)
0036         self.config_section = Sections.Marshaller
0037         self.poll_time_period = int(poll_time_period)
0038         self.retrieve_bulk_size = int(retrieve_bulk_size)
0039 
0040         self.new_task_queue = Queue()
0041         self.new_output_queue = Queue()
0042         self.running_task_queue = Queue()
0043         self.running_output_queue = Queue()
0044 
0045     def get_new_workprogresses(self):
0046         """
0047         Get new workprogress to process
0048         """
0049 
0050         workprogress_status = [WorkprogressStatus.New, WorkprogressStatus.Ready, WorkprogressStatus.Extend]
0051         workprogresses_new = core_workprogress.get_workprogresses_by_status(status=workprogress_status,
0052                                                                             locking=True,
0053                                                                             bulk_size=self.retrieve_bulk_size)
0054 
0055         self.logger.debug("Main thread get %s New+Ready+Extend workprogresses to process" % len(workprogresses_new))
0056         if workprogresses_new:
0057             self.logger.info("Main thread get %s New+Ready+Extend workprogresses to process" % len(workprogresses_new))
0058         return workprogresses_new
0059 
0060     def process_new_workprogress(self, workprogress):
0061         """
0062         Process new workprogress
0063         """
0064         wf = workprogress['workprogress_metadata']['workflow']
0065         works = wf.get_new_works()
0066 
0067         transforms = []
0068         for work in works:
0069             new_work = work.copy()
0070             new_work.add_proxy(wf.get_proxy())
0071             transform = {'workprogress_id': workprogress['workprogress_id'],
0072                          'request_id': workprogress['request_id'],
0073                          'workload_id': workprogress['workload_id'],
0074                          'transform_type': work.get_work_type(),
0075                          'transform_tag': work.get_work_tag(),
0076                          'priority': workprogress['priority'],
0077                          'status': TransformStatus.New,
0078                          'retries': 0,
0079                          'expired_at': workprogress['expired_at'],
0080                          'transform_metadata': {'orginal_work': work, 'work': new_work}
0081                          # 'collections': related_collections
0082                          }
0083             transforms.append(transform)
0084 
0085         self.logger.info("Processing workprogress(%s): new transforms: %s" % (workprogress['workprogress_id'],
0086                                                                               transforms))
0087 
0088         workprogress['locking'] = WorkprogressLocking.Idle
0089         workprogress['status'] = WorkprogressStatus.Transforming
0090         ret_wp = {'workprogress': workprogress, 'new_transforms': transforms}
0091         return ret_wp
0092 
0093     def process_new_workprogresses(self):
0094         ret = []
0095         while not self.new_task_queue.empty():
0096             try:
0097                 workprogress = self.new_task_queue.get()
0098                 if workprogress:
0099                     self.logger.info("Main thread processing new workprogress: %s" % workprogress)
0100                     ret_workprogress = self.process_new_workprogress(workprogress)
0101                     if ret_workprogress:
0102                         ret.append(ret_workprogress)
0103             except Exception as ex:
0104                 self.logger.error(ex)
0105                 self.logger.error(traceback.format_exc())
0106         return ret
0107 
0108     def finish_new_workprogresses(self):
0109         while not self.new_output_queue.empty():
0110             try:
0111                 ret = self.new_output_queue.get()
0112                 self.logger.info("Main thread finishing new workprogress: %s" % ret['workprogress'])
0113                 if ret:
0114                     wp = ret['workprogress']
0115                     tfs = ret['new_transforms']
0116                     wp_parameters = {'status': wp['status'],
0117                                      'locking': wp['locking'],
0118                                      'workprogress_metadata': wp['workprogress_metadata']}
0119                     core_workprogress.update_workprogress(workprogress_id=wp['workprogress_id'],
0120                                                           parameters=wp_parameters,
0121                                                           new_transforms=tfs)
0122             except Exception as ex:
0123                 self.logger.error(ex)
0124                 self.logger.error(traceback.format_exc())
0125 
0126     def get_running_workprogresses(self):
0127         """
0128         Get workprogresses to running
0129         """
0130         workprogress_status = [WorkprogressStatus.Transforming, WorkprogressStatus.ToCancel,
0131                                WorkprogressStatus.Cancelling]
0132         workprogresses = core_workprogress.get_workprogresses_by_status(status=workprogress_status,
0133                                                                         period=self.poll_time_period,
0134                                                                         locking=True,
0135                                                                         bulk_size=self.retrieve_bulk_size)
0136 
0137         self.logger.debug("Main thread get %s progressing workprogresses to process" % len(workprogresses))
0138         if workprogresses:
0139             self.logger.info("Main thread get %s progressing workprogresses to process" % len(workprogresses))
0140         return workprogresses
0141 
0142     def process_running_workprogress(self, workprogress):
0143         """
0144         process running workprogresses
0145         """
0146         self.logger.info("process_running_workprogress: workprogress_id: %s" % workprogress['workprogress_id'])
0147         workprogress_metadata = workprogress['workprogress_metadata']
0148         wf = workprogress_metadata['workflow']
0149 
0150         new_transforms = []
0151         if workprogress['status'] in [WorkprogressStatus.Transforming]:
0152             # new works
0153             works = wf.get_new_works()
0154             for work in works:
0155                 new_work = work.copy()
0156                 new_work.add_proxy(wf.get_proxy())
0157                 new_transform = {'workprogress_id': workprogress['workprogress_id'],
0158                                  'request_id': workprogress['request_id'],
0159                                  'workload_id': workprogress['workload_id'],
0160                                  'transform_type': work.get_work_type(),
0161                                  'transform_tag': work.get_work_tag(),
0162                                  'priority': workprogress['priority'],
0163                                  'status': TransformStatus.New,
0164                                  'retries': 0,
0165                                  'expired_at': workprogress['expired_at'],
0166                                  'transform_metadata': {'orginal_work': work, 'work': new_work}
0167                                  # 'collections': related_collections
0168                                  }
0169                 new_transforms.append(new_transform)
0170             self.logger.info("Processing workprogress(%s): new transforms: %s" % (workprogress['workprogress_id'],
0171                                                                                   new_transforms))
0172 
0173         update_transforms = {}
0174         if workprogress['status'] in [WorkprogressStatus.ToCancel]:
0175             # current works
0176             works = wf.get_current_works()
0177             # print(works)
0178             for work in works:
0179                 if work.get_status() not in [WorkStatus.Finished, WorkStatus.SubFinished,
0180                                              WorkStatus.Failed, WorkStatus.Cancelling,
0181                                              WorkStatus.Cancelled]:
0182                     update_transforms[work.get_work_id()] = {'status': TransformStatus.ToCancel}
0183 
0184         # current works
0185         works = wf.get_current_works()
0186         # print(works)
0187         for work in works:
0188             # print(work.get_work_id())
0189             tf = core_transforms.get_transform(transform_id=work.get_work_id())
0190             work_status = WorkStatus(tf['status'].value)
0191             work.set_status(work_status)
0192             work.set_terminated_msg(msg=None)   # TODO
0193 
0194         if wf.is_terminated():
0195             if wf.is_finished():
0196                 wp_status = WorkprogressStatus.Finished
0197             elif wf.is_subfinished():
0198                 wp_status = WorkprogressStatus.SubFinished
0199             elif wf.is_failed():
0200                 wp_status = WorkprogressStatus.Failed
0201             elif wf.is_cancelled():
0202                 wp_status = WorkprogressStatus.Cancelled
0203             else:
0204                 wp_status = WorkprogressStatus.Failed
0205             wp_msg = wf.get_terminated_msg()
0206         else:
0207             wp_status = WorkprogressStatus.Transforming
0208             wp_msg = None
0209         parameters = {'status': wp_status,
0210                       'locking': WorkprogressLocking.Idle,
0211                       'workprogress_metadata': workprogress_metadata,
0212                       'errors': {'msg': wp_msg}}
0213         ret = {'workprogress_id': workprogress['workprogress_id'],
0214                'parameters': parameters,
0215                'new_transforms': new_transforms,
0216                'update_transforms': update_transforms}
0217         return ret
0218 
0219     def process_running_workprogresses(self):
0220         ret = []
0221         while not self.running_task_queue.empty():
0222             try:
0223                 workprogress = self.running_task_queue.get()
0224                 if workprogress:
0225                     self.logger.info("Main thread processing running workprogress: %s" % workprogress)
0226                     ret_workprogress = self.process_running_workprogress(workprogress)
0227                     if ret_workprogress:
0228                         ret.append(ret_workprogress)
0229             except Exception as ex:
0230                 self.logger.error(ex)
0231                 self.logger.error(traceback.format_exc())
0232         return ret
0233 
0234     def finish_running_workprogresses(self):
0235         while not self.running_output_queue.empty():
0236             try:
0237                 ret = self.running_output_queue.get()
0238                 self.logger.info("Main thread finishing processing workprogress: %s" % ret)
0239 
0240                 wp_id = ret['workprogress_id']
0241                 parameters = ret['parameters']
0242                 new_transforms = ret['new_transforms']
0243                 update_transforms = ret['update_transforms']
0244                 core_workprogress.update_workprogress(workprogress_id=wp_id,
0245                                                       parameters=parameters,
0246                                                       new_transforms=new_transforms,
0247                                                       update_transforms=update_transforms)
0248             except Exception as ex:
0249                 self.logger.error(ex)
0250                 self.logger.error(traceback.format_exc())
0251 
0252     def clean_locks(self):
0253         self.logger.info("clean locking")
0254         core_workprogress.clean_locking()
0255 
0256     def run(self):
0257         """
0258         Main run function.
0259         """
0260         try:
0261             self.logger.info("Starting main thread")
0262 
0263             self.load_plugins()
0264 
0265             self.add_default_tasks()
0266 
0267             task = self.create_task(task_func=self.get_new_workprogresses, task_output_queue=self.new_task_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0268             self.add_task(task)
0269             for _ in range(self.num_threads):
0270                 task = self.create_task(task_func=self.process_new_workprogresses, task_output_queue=self.new_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0271                 self.add_task(task)
0272             task = self.create_task(task_func=self.finish_new_workprogresses, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=2, priority=1)
0273             self.add_task(task)
0274 
0275             task = self.create_task(task_func=self.get_running_workprogresses, task_output_queue=self.running_task_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0276             self.add_task(task)
0277             for _ in range(self.num_threads):
0278                 task = self.create_task(task_func=self.process_running_workprogresses, task_output_queue=self.running_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0279                 self.add_task(task)
0280             task = self.create_task(task_func=self.finish_running_workprogresses, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0281             self.add_task(task)
0282 
0283             task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
0284             self.add_task(task)
0285 
0286             self.execute()
0287         except KeyboardInterrupt:
0288             self.stop()
0289 
0290 
0291 if __name__ == '__main__':
0292     agent = Marshaller()
0293     agent()