Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 import traceback
0012 
0013 from idds.common import exceptions
0014 from idds.common.constants import (ProcessingStatus, ProcessingLocking, ReturnCode,
0015                                    Terminated_processing_status)
0016 from idds.common.utils import setup_logging, truncate_string
0017 from idds.core import processings as core_processings
0018 from idds.agents.common.baseagent import BaseAgent
0019 from idds.agents.common.eventbus.event import (EventType,
0020                                                TerminatedProcessingEvent,
0021                                                SyncProcessingEvent)
0022 
0023 from .utils import (handle_trigger_processing,
0024                     is_process_terminated)
0025 from .poller import Poller
0026 
0027 setup_logging(__name__)
0028 
0029 
0030 class Trigger(Poller):
0031     """
0032     Trigger works to trigger to release jobs
0033     """
0034 
0035     def __init__(self, num_threads=1, trigger_max_number_workers=None, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
0036                  name='Trigger', use_process_pool=False, message_bulk_size=1000, max_updates_per_round=2000, **kwargs):
0037         if trigger_max_number_workers:
0038             self.max_number_workers = int(trigger_max_number_workers)
0039         else:
0040             self.max_number_workers = int(max_number_workers)
0041 
0042         self.set_max_workers()
0043 
0044         num_threads = int(self.max_number_workers)
0045         super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers,
0046                                       max_updates_per_round=max_updates_per_round, use_process_pool=use_process_pool,
0047                                       retrieve_bulk_size=retrieve_bulk_size, **kwargs)
0048         self.logger.info("num_threads: %s" % num_threads)
0049 
0050         self.max_updates_per_round = int(max_updates_per_round)
0051         self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round)
0052 
0053         if hasattr(self, 'trigger_max_number_workers'):
0054             self.max_number_workers = int(self.trigger_max_number_workers)
0055         self.number_msg_workers = 0
0056 
0057     def is_ok_to_run_more_msg_processings(self):
0058         if self.get_num_free_workers() > 0:
0059             return True
0060         return False
0061 
0062     def get_trigger_processings(self):
0063         """
0064         Get trigger processing
0065         """
0066         try:
0067             self.show_queue_size()
0068             if not self.is_ok_to_run_more_processings():
0069                 return []
0070             # self.show_queue_size()
0071 
0072             if BaseAgent.min_request_id is None:
0073                 return []
0074 
0075             processing_status = [ProcessingStatus.ToTrigger, ProcessingStatus.Triggering]
0076             processings = core_processings.get_processings_by_status(status=processing_status,
0077                                                                      locking=True, update_poll=True,
0078                                                                      min_request_id=BaseAgent.min_request_id,
0079                                                                      bulk_size=self.retrieve_bulk_size)
0080             if processings:
0081                 processing_ids = [pr['processing_id'] for pr in processings]
0082                 self.logger.info("Main thread get [ToTrigger, Triggering] processings to process: %s" % (str(processing_ids)))
0083 
0084             for pr in processings:
0085                 self.submit(self.process_trigger_processing, **{"processing": pr})
0086 
0087             return processings
0088         except exceptions.DatabaseException as ex:
0089             if 'ORA-00060' in str(ex):
0090                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0091             else:
0092                 # raise ex
0093                 self.logger.error(ex)
0094                 self.logger.error(traceback.format_exc())
0095         return []
0096 
0097     def handle_trigger_processing(self, processing, trigger_new_updates=False):
0098         try:
0099             log_prefix = self.get_log_prefix(processing)
0100             executors = None
0101             if self.enable_executors:
0102                 executors = self.get_extra_executors()
0103 
0104             ret_trigger_processing = handle_trigger_processing(processing,
0105                                                                self.agent_attributes,
0106                                                                trigger_new_updates=trigger_new_updates,
0107                                                                max_updates_per_round=self.max_updates_per_round,
0108                                                                executors=executors,
0109                                                                logger=self.logger,
0110                                                                log_prefix=log_prefix)
0111             process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms, has_updates = ret_trigger_processing
0112 
0113             self.logger.debug(log_prefix + "handle_trigger_processing: ret_update_transforms: %s" % str(ret_update_transforms))
0114 
0115             new_process_status = process_status
0116             if is_process_terminated(process_status):
0117                 new_process_status = ProcessingStatus.Terminating
0118 
0119             update_processing = {'processing_id': processing['processing_id'],
0120                                  'parameters': {'status': new_process_status,
0121                                                 'substatus': process_status,
0122                                                 'locking': ProcessingLocking.Idle}}
0123 
0124             if parameters:
0125                 # special parameters such as 'output_metadata'
0126                 for p in parameters:
0127                     update_processing['parameters'][p] = parameters[p]
0128 
0129             ret = {'update_processing': update_processing,
0130                    'update_contents': update_contents,
0131                    'messages': ret_msgs,
0132                    'new_update_contents': new_update_contents,
0133                    'update_transforms': ret_update_transforms,
0134                    'update_dep_contents': (processing['request_id'], update_dep_contents_status_name, update_dep_contents_status),
0135                    'processing_status': new_process_status,
0136                    'has_updates': has_updates}
0137         except exceptions.ProcessFormatNotSupported as ex:
0138             self.logger.error(ex)
0139             self.logger.error(traceback.format_exc())
0140 
0141             retries = processing['update_retries'] + 1
0142             if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0143                 proc_status = ProcessingStatus.Running
0144             else:
0145                 proc_status = ProcessingStatus.Failed
0146             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0147 
0148             # increase poll period
0149             update_poll_period = int(processing['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
0150             if update_poll_period > self.max_update_poll_period:
0151                 update_poll_period = self.max_update_poll_period
0152 
0153             update_processing = {'processing_id': processing['processing_id'],
0154                                  'parameters': {'status': proc_status,
0155                                                 'locking': ProcessingLocking.Idle,
0156                                                 'update_retries': retries,
0157                                                 'update_poll_period': update_poll_period,
0158                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0159             update_processing['parameters']['errors'].update(error)
0160 
0161             ret = {'update_processing': update_processing,
0162                    'update_contents': []}
0163         except Exception as ex:
0164             self.logger.error(ex)
0165             self.logger.error(traceback.format_exc())
0166 
0167             retries = processing['update_retries'] + 1
0168             if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0169                 proc_status = ProcessingStatus.Running
0170             else:
0171                 proc_status = ProcessingStatus.Failed
0172             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0173             update_processing = {'processing_id': processing['processing_id'],
0174                                  'parameters': {'status': proc_status,
0175                                                 'locking': ProcessingLocking.Idle,
0176                                                 'update_retries': retries,
0177                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0178             update_processing['parameters']['errors'].update(error)
0179             update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0180 
0181             ret = {'update_processing': update_processing,
0182                    'update_contents': []}
0183         return ret
0184 
0185     def process_trigger_processing_real(self, event=None, processing=None):
0186         pro_ret = ReturnCode.Ok.value
0187         try:
0188             if processing is None and event:
0189                 # pr_status = [ProcessingStatus.New]
0190                 self.logger.info("process_trigger_processing, event: %s" % str(event))
0191                 pr = self.get_processing(processing_id=event._processing_id, status=None, exclude_status=[ProcessingStatus.Prepared], locking=True)
0192                 if not pr:
0193                     self.logger.warn("Cannot find processing for event: %s" % str(event))
0194                     # pro_ret = ReturnCode.Locked.value
0195                     pro_ret = ReturnCode.Ok.value
0196                 elif pr['status'] in Terminated_processing_status:
0197                     parameters = {'locking': ProcessingLocking.Idle}
0198                     update_processing = {'processing_id': pr['processing_id'],
0199                                          'parameters': parameters}
0200                     ret = {'update_processing': update_processing,
0201                            'update_contents': []}
0202                     self.update_processing(ret, pr)
0203                     pro_ret = ReturnCode.Ok.value
0204                 else:
0205                     processing = pr
0206             if processing:
0207                 pr = processing
0208                 log_pre = self.get_log_prefix(pr)
0209                 self.logger.info(log_pre + "process_trigger_processing")
0210                 ret = self.handle_trigger_processing(pr)
0211                 # self.logger.info(log_pre + "process_trigger_processing result: %s" % str(ret))
0212 
0213                 # new_update_contents = ret.get('new_update_contents', None)
0214                 ret['new_update_contents'] = None
0215                 # ret_update_contents = ret.get('update_contents', None)
0216                 self.update_processing(ret, pr)
0217 
0218                 if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating)
0219                     or (event and event._content and 'Terminated' in event._content and event._content['Terminated'])):   # noqa W503
0220                     self.logger.info(log_pre + "TerminatedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0221                     event = TerminatedProcessingEvent(publisher_id=self.id,
0222                                                       processing_id=pr['processing_id'],
0223                                                       content=event._content if event else None)
0224                     event.set_terminating()
0225                     self.event_bus.send(event)
0226                 else:
0227                     # if ((event and event._content and 'has_updates' in event._content and event._content['has_updates'])
0228                     #     or ('update_contents' in ret and ret['update_contents'])    # noqa W503
0229                     #     or ('new_contents' in ret and ret['new_contents'])          # noqa W503
0230                     #     or ('messages' in ret and ret['messages'])                  # noqa W503
0231                     #     or ('has_updates' in ret and ret['has_updates'])):                                            # noqa E129
0232                     self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id'])
0233                     event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'],
0234                                                 content=event._content if event else None)
0235                     self.event_bus.send(event)
0236         except Exception as ex:
0237             self.logger.error(ex)
0238             self.logger.error(traceback.format_exc())
0239             pro_ret = ReturnCode.Failed.value
0240         return pro_ret
0241 
0242     def process_trigger_processing(self, event=None, processing=None):
0243         self.number_workers += 1
0244         ret = self.process_trigger_processing_real(event=event, processing=processing)
0245         self.number_workers -= 1
0246         return ret
0247 
0248     def process_msg_trigger_processing(self, event):
0249         self.number_msg_workers += 1
0250         ret = self.process_trigger_processing_real(event)
0251         self.number_msg_workers -= 1
0252         return ret
0253 
0254     def init_event_function_map(self):
0255         self.event_func_map = {
0256             EventType.TriggerProcessing: {
0257                 'pre_check': self.is_ok_to_run_more_processings,
0258                 'exec_func': self.process_trigger_processing
0259             },
0260             EventType.MsgTriggerProcessing: {
0261                 'pre_check': self.is_ok_to_run_more_msg_processings,
0262                 'exec_func': self.process_msg_trigger_processing
0263             }
0264         }
0265 
0266     def run(self):
0267         """
0268         Main run function.
0269         """
0270         try:
0271             self.logger.info("Starting main thread")
0272             self.init_thread_info()
0273 
0274             self.load_plugins()
0275             self.init()
0276 
0277             self.add_default_tasks()
0278 
0279             self.init_event_function_map()
0280 
0281             task = self.create_task(task_func=self.get_trigger_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0282             self.add_task(task)
0283 
0284             task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0285             self.add_task(task)
0286 
0287             self.execute()
0288         except KeyboardInterrupt:
0289             self.stop()
0290 
0291 
0292 if __name__ == '__main__':
0293     agent = Trigger()
0294     agent()