Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 import traceback
0012 
0013 from idds.common import exceptions
0014 from idds.common.constants import ProcessingType, ProcessingStatus, ProcessingLocking
0015 from idds.common.utils import setup_logging, truncate_string
0016 from idds.core import processings as core_processings
0017 from idds.agents.common.baseagent import BaseAgent
0018 from idds.agents.common.eventbus.event import (EventType,
0019                                                NewProcessingEvent,
0020                                                PreparedProcessingEvent,
0021                                                UpdateTransformEvent)
0022 
0023 from .utils import handle_new_processing, handle_prepared_processing
0024 from .iutils import handle_new_iprocessing
0025 from .poller import Poller
0026 
0027 setup_logging(__name__)
0028 
0029 
0030 class Submitter(Poller):
0031     """
0032     Submitter works to submit and running tasks to WFMS.
0033     """
0034 
0035     def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
0036                  name='Submitter', use_process_pool=False, message_bulk_size=1000, **kwargs):
0037         self.max_number_workers = max_number_workers
0038         self.set_max_workers()
0039         num_threads = self.max_number_workers
0040 
0041         super(Submitter, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers,
0042                                         name=name, use_process_pool=use_process_pool,
0043                                         retrieve_bulk_size=retrieve_bulk_size, **kwargs)
0044         self.site_to_cloud = None
0045 
0046         self._new_processing_status = None
0047         self._prepared_processing_status = None
0048 
0049     def get_new_processings(self):
0050         """
0051         Get new processing
0052         """
0053         try:
0054             if not self.is_ok_to_run_more_processings():
0055                 return []
0056 
0057             self.show_queue_size()
0058 
0059             if BaseAgent.min_request_id is None:
0060                 return []
0061 
0062             processing_status = [ProcessingStatus.New]
0063             self._new_processing_status = processing_status
0064             processings = core_processings.get_processings_by_status(status=processing_status, locking=True,
0065                                                                      new_poll=True,
0066                                                                      min_request_id=BaseAgent.min_request_id,
0067                                                                      bulk_size=self.get_bulk_size())
0068 
0069             # self.logger.debug("Main thread get %s [new] processings to process" % len(processings))
0070             if processings:
0071                 processing_ids = [pr['processing_id'] for pr in processings]
0072                 self.logger.info("Main thread get [new] processings to process: %s" % str(processing_ids))
0073 
0074             for pr in processings:
0075                 self.submit(self.process_new_processing, **{"processing": pr})
0076 
0077             return processings
0078         except exceptions.DatabaseException as ex:
0079             if 'ORA-00060' in str(ex):
0080                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0081             else:
0082                 # raise ex
0083                 self.logger.error(ex)
0084                 self.logger.error(traceback.format_exc())
0085         return []
0086 
0087     def get_prepared_processings(self):
0088         """
0089         Get prepared processing
0090         """
0091         try:
0092             if not self.is_ok_to_run_more_processings():
0093                 return []
0094 
0095             self.show_queue_size()
0096 
0097             if BaseAgent.min_request_id is None:
0098                 return []
0099 
0100             processing_status = [ProcessingStatus.Prepared]
0101             self._prepared_processing_status = processing_status
0102             processings = core_processings.get_processings_by_status(status=processing_status, locking=True,
0103                                                                      new_poll=True,
0104                                                                      min_request_id=BaseAgent.min_request_id,
0105                                                                      bulk_size=self.get_bulk_size())
0106 
0107             # self.logger.debug("Main thread get %s [new] processings to process" % len(processings))
0108             if processings:
0109                 processing_ids = [pr['processing_id'] for pr in processings]
0110                 self.logger.info("Main thread get [prepared] processings to process: %s" % str(processing_ids))
0111 
0112             for pr in processings:
0113                 self.submit(self.process_prepared_processing, **{"processing": pr})
0114 
0115             return processings
0116         except exceptions.DatabaseException as ex:
0117             if 'ORA-00060' in str(ex):
0118                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0119             else:
0120                 # raise ex
0121                 self.logger.error(ex)
0122                 self.logger.error(traceback.format_exc())
0123         return []
0124 
0125     def get_site_to_cloud(self, site, log_prefix=''):
0126         try:
0127             if self.site_to_cloud is None:
0128                 self.logger.debug(log_prefix + " agent_attributes: %s" % str(self.agent_attributes))
0129                 self.site_to_cloud = {}
0130                 if self.agent_attributes and 'domapandawork' in self.agent_attributes and self.agent_attributes['domapandawork']:
0131                     if 'site_to_cloud' in self.agent_attributes['domapandawork'] and self.agent_attributes['domapandawork']['site_to_cloud']:
0132                         site_to_clouds = self.agent_attributes['domapandawork']['site_to_cloud'].split(",")
0133                         for site_to_cloud in site_to_clouds:
0134                             local_site, cloud = site_to_cloud.split(':')
0135                             if local_site not in self.site_to_cloud:
0136                                 self.site_to_cloud[local_site] = cloud
0137                 self.logger.debug(log_prefix + " site_to_cloud: %s" % self.site_to_cloud)
0138 
0139             if site and self.site_to_cloud:
0140                 cloud = self.site_to_cloud.get(site, None)
0141                 if cloud:
0142                     self.logger.debug(log_prefix + "cloud for site(%s): %s" % (site, cloud))
0143                     return cloud
0144             if 'default' in self.site_to_cloud:
0145                 cloud = self.site_to_cloud.get('default', None)
0146                 self.logger.debug(log_prefix + "cloud for default site(%s): %s" % (site, cloud))
0147                 return cloud
0148         except Exception as ex:
0149             self.logger.error(ex)
0150         return None
0151 
0152     def handle_new_processing(self, processing, check_previous=True):
0153         try:
0154             log_prefix = self.get_log_prefix(processing)
0155 
0156             # transform_id = processing['transform_id']
0157             # transform = core_transforms.get_transform(transform_id=transform_id)
0158             # work = transform['transform_metadata']['work']
0159             executors = None
0160             if self.enable_executors:
0161                 executors = self.get_extra_executors()
0162 
0163             if check_previous:
0164                 pre_works_are_ok = True
0165                 if processing['parent_internal_id']:
0166                     parent_internal_ids = processing['parent_internal_id'].split(",")
0167                     # remove processing['internal_id'] from parent_internal_ids if exists, for the case of retrying a failed processing, which will be updated with a new internal_id, but the parent_internal_id is not updated.
0168                     if processing['internal_id'] in parent_internal_ids:
0169                         parent_internal_ids.remove(processing['internal_id'])
0170                     if not parent_internal_ids:
0171                         pre_works_are_ok = True
0172                     else:
0173                         prs = core_processings.get_processings(
0174                             request_id=processing['request_id'],
0175                             internal_ids=parent_internal_ids,
0176                             loop_index=processing['loop_index']
0177                         )
0178                         if not prs:
0179                             pre_works_are_ok = False
0180                         else:
0181                             all_pr_internal_ids = []
0182                             for pr in prs:
0183                                 if pr['status'] not in [
0184                                     ProcessingStatus.Submitting,
0185                                     ProcessingStatus.Submitted,
0186                                     ProcessingStatus.Running,
0187                                     ProcessingStatus.Finished,
0188                                     ProcessingStatus.Failed,
0189                                     ProcessingStatus.FinishedOnStep,
0190                                     ProcessingStatus.FinishedOnExec,
0191                                     ProcessingStatus.FinishedTerm,
0192                                     ProcessingStatus.SubFinished,
0193                                     ProcessingStatus.Broken,
0194                                     ProcessingStatus.Terminating,
0195                                     ProcessingStatus.ToTrigger,
0196                                     ProcessingStatus.Triggering,
0197                                     ProcessingStatus.Synchronizing,
0198                                     ProcessingStatus.Prepared
0199                                 ]:
0200                                     pre_works_are_ok = False
0201                                     break
0202                                 all_pr_internal_ids.append(pr['internal_id'])
0203                             if set(parent_internal_ids) != set(all_pr_internal_ids):
0204                                 pre_works_are_ok = False
0205             else:
0206                 pre_works_are_ok = True
0207 
0208             if pre_works_are_ok:
0209                 ret_new_processing = handle_new_processing(processing,
0210                                                            self.agent_attributes,
0211                                                            func_site_to_cloud=self.get_site_to_cloud,
0212                                                            max_updates_per_round=self.max_updates_per_round,
0213                                                            executors=executors,
0214                                                            logger=self.logger,
0215                                                            log_prefix=log_prefix)
0216                 status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
0217 
0218                 if not status:
0219                     raise exceptions.ProcessSubmitFailed(str(errors))
0220 
0221                 parameters = {'status': ProcessingStatus.Prepared,
0222                               'substatus': ProcessingStatus.Prepared,
0223                               'locking': ProcessingLocking.Idle,
0224                               'processing_metadata': processing['processing_metadata']}
0225                 parameters = self.load_poll_period(processing, parameters, new=True)
0226 
0227                 processing['substatus'] = ProcessingStatus.Prepared
0228                 update_processing = {'processing_id': processing['processing_id'],
0229                                      'parameters': parameters}
0230                 ret = {'update_processing': update_processing,
0231                        'update_collections': update_colls,
0232                        'update_contents': [],
0233                        'new_contents': new_contents,
0234                        'new_input_dependency_contents': new_input_dependency_contents,
0235                        'messages': msgs,
0236                        }
0237             else:
0238                 parameters = {'locking': ProcessingLocking.Idle}
0239                 update_processing = {'processing_id': processing['processing_id'],
0240                                      'parameters': parameters}
0241                 ret = {'update_processing': update_processing,
0242                        'update_collections': [],
0243                        'update_contents': [],
0244                        'new_contents': [],
0245                        'new_input_dependency_contents': [],
0246                        'messages': [],
0247                        }
0248         except Exception as ex:
0249             self.logger.error(ex)
0250             self.logger.error(traceback.format_exc())
0251 
0252             retries = processing['new_retries'] + 1
0253             if not processing['max_new_retries'] or retries < processing['max_new_retries']:
0254                 pr_status = processing['status']
0255             else:
0256                 pr_status = ProcessingStatus.Failed
0257             # increase poll period
0258             new_poll_period = int(processing['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0259             if new_poll_period > self.max_new_poll_period:
0260                 new_poll_period = self.max_new_poll_period
0261 
0262             error = {'process_new_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
0263             parameters = {'status': pr_status,
0264                           'locking': ProcessingLocking.Idle,
0265                           'new_poll_period': new_poll_period,
0266                           'errors': processing['errors'] if processing['errors'] else {},
0267                           'new_retries': retries}
0268             parameters['errors'].update(error)
0269 
0270             update_processing = {'processing_id': processing['processing_id'],
0271                                  'parameters': parameters}
0272             ret = {'update_processing': update_processing,
0273                    'update_contents': []}
0274         return ret
0275 
0276     def handle_new_iprocessing(self, processing):
0277         try:
0278             log_prefix = self.get_log_prefix(processing)
0279 
0280             # transform_id = processing['transform_id']
0281             # transform = core_transforms.get_transform(transform_id=transform_id)
0282             # work = transform['transform_metadata']['work']
0283             executors, plugin = None, None
0284             if processing['processing_type']:
0285                 plugin_name = processing['processing_type'].name.lower() + '_submitter'
0286                 plugin = self.get_plugin(plugin_name)
0287             else:
0288                 raise exceptions.ProcessSubmitFailed('No corresponding submitter plugins for %s' % processing['processing_type'])
0289             ret_new_processing = handle_new_iprocessing(processing,
0290                                                         self.agent_attributes,
0291                                                         plugin=plugin,
0292                                                         func_site_to_cloud=self.get_site_to_cloud,
0293                                                         max_updates_per_round=self.max_updates_per_round,
0294                                                         executors=executors,
0295                                                         logger=self.logger,
0296                                                         log_prefix=log_prefix)
0297             status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
0298 
0299             if not status:
0300                 raise exceptions.ProcessSubmitFailed(str(errors))
0301 
0302             parameters = {'status': ProcessingStatus.Submitting,
0303                           'substatus': ProcessingStatus.Submitting,
0304                           'locking': ProcessingLocking.Idle,
0305                           'processing_metadata': processing['processing_metadata']}
0306             parameters = self.load_poll_period(processing, parameters, new=True)
0307 
0308             if 'submitted_at' in processing:
0309                 parameters['submitted_at'] = processing['submitted_at']
0310 
0311             if 'workload_id' in processing:
0312                 parameters['workload_id'] = processing['workload_id']
0313 
0314             update_processing = {'processing_id': processing['processing_id'],
0315                                  'parameters': parameters}
0316             ret = {'update_processing': update_processing,
0317                    'update_collections': update_colls,
0318                    'update_contents': [],
0319                    'new_contents': new_contents,
0320                    'new_input_dependency_contents': new_input_dependency_contents,
0321                    'messages': msgs,
0322                    }
0323         except Exception as ex:
0324             self.logger.error(ex)
0325             self.logger.error(traceback.format_exc())
0326 
0327             retries = processing['new_retries'] + 1
0328             if not processing['max_new_retries'] or retries < processing['max_new_retries']:
0329                 pr_status = processing['status']
0330             else:
0331                 pr_status = ProcessingStatus.Failed
0332             # increase poll period
0333             new_poll_period = int(processing['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0334             if new_poll_period > self.max_new_poll_period:
0335                 new_poll_period = self.max_new_poll_period
0336 
0337             error = {'submit_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
0338             parameters = {'status': pr_status,
0339                           'locking': ProcessingLocking.Idle,
0340                           'new_poll_period': new_poll_period,
0341                           'errors': processing['errors'] if processing['errors'] else {},
0342                           'new_retries': retries}
0343             parameters['errors'].update(error)
0344 
0345             update_processing = {'processing_id': processing['processing_id'],
0346                                  'parameters': parameters}
0347             ret = {'update_processing': update_processing,
0348                    'update_contents': []}
0349         return ret
0350 
0351     def handle_prepared_processing(self, processing, check_previous=False):
0352         try:
0353             log_prefix = self.get_log_prefix(processing)
0354 
0355             # transform_id = processing['transform_id']
0356             # transform = core_transforms.get_transform(transform_id=transform_id)
0357             # work = transform['transform_metadata']['work']
0358             executors = None
0359             if self.enable_executors:
0360                 executors = self.get_extra_executors()
0361 
0362             if check_previous:
0363                 pre_works_are_ok = True
0364                 if processing['parent_internal_id']:
0365                     parent_internal_ids = processing['parent_internal_id'].split(",")
0366                     prs = core_processings.get_processings(
0367                         request_id=processing['request_id'],
0368                         internal_ids=parent_internal_ids,
0369                         loop_index=processing['loop_index']
0370                     )
0371                     if not prs:
0372                         pre_works_are_ok = False
0373                     else:
0374                         all_pr_internal_ids = []
0375                         for pr in prs:
0376                             if not pr['workload_id']:
0377                                 pre_works_are_ok = False
0378                                 break
0379                             all_pr_internal_ids.append(pr['internal_id'])
0380                         if set(parent_internal_ids) != set(all_pr_internal_ids):
0381                             pre_works_are_ok = False
0382             else:
0383                 pre_works_are_ok = True
0384 
0385             if pre_works_are_ok:
0386                 ret_new_processing = handle_prepared_processing(processing,
0387                                                                 self.agent_attributes,
0388                                                                 func_site_to_cloud=self.get_site_to_cloud,
0389                                                                 max_updates_per_round=self.max_updates_per_round,
0390                                                                 executors=executors,
0391                                                                 logger=self.logger,
0392                                                                 log_prefix=log_prefix)
0393                 status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
0394 
0395                 if not status:
0396                     raise exceptions.ProcessSubmitFailed(str(errors))
0397 
0398                 parameters = {'status': ProcessingStatus.Submitting,
0399                               'substatus': ProcessingStatus.Submitting,
0400                               'locking': ProcessingLocking.Idle,
0401                               'processing_metadata': processing['processing_metadata']}
0402                 parameters = self.load_poll_period(processing, parameters, new=True)
0403 
0404                 proc = processing['processing_metadata']['processing']
0405                 if proc.submitted_at:
0406                     if not processing['submitted_at'] or processing['submitted_at'] < proc.submitted_at:
0407                         parameters['submitted_at'] = proc.submitted_at
0408 
0409                 # if processing['processing_metadata'] and 'processing' in processing['processing_metadata']:
0410                 if proc.workload_id and not processing['workload_id']:
0411                     parameters['workload_id'] = proc.workload_id
0412 
0413                 update_processing = {'processing_id': processing['processing_id'],
0414                                      'parameters': parameters}
0415                 ret = {'update_processing': update_processing,
0416                        'update_collections': update_colls,
0417                        'update_contents': [],
0418                        'new_contents': new_contents,
0419                        'new_input_dependency_contents': new_input_dependency_contents,
0420                        'messages': msgs,
0421                        }
0422             else:
0423                 parameters = {'locking': ProcessingLocking.Idle}
0424                 update_processing = {'processing_id': processing['processing_id'],
0425                                      'parameters': parameters}
0426                 ret = {'update_processing': update_processing,
0427                        'update_collections': [],
0428                        'update_contents': [],
0429                        'new_contents': [],
0430                        'new_input_dependency_contents': [],
0431                        'messages': [],
0432                        }
0433         except Exception as ex:
0434             self.logger.error(ex)
0435             self.logger.error(traceback.format_exc())
0436 
0437             retries = processing['new_retries'] + 1
0438             if not processing['max_new_retries'] or retries < processing['max_new_retries']:
0439                 pr_status = processing['status']
0440             else:
0441                 pr_status = ProcessingStatus.Failed
0442             # increase poll period
0443             new_poll_period = int(processing['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0444             if new_poll_period < self.new_fail_poll_period:
0445                 new_poll_period = self.new_fail_poll_period
0446             if new_poll_period > self.max_new_poll_period:
0447                 new_poll_period = self.max_new_poll_period
0448 
0449             error = {'submit_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
0450             parameters = {'status': pr_status,
0451                           'locking': ProcessingLocking.Idle,
0452                           'new_poll_period': new_poll_period,
0453                           'errors': processing['errors'] if processing['errors'] else {},
0454                           'new_retries': retries}
0455             parameters['errors'].update(error)
0456 
0457             update_processing = {'processing_id': processing['processing_id'],
0458                                  'parameters': parameters}
0459             ret = {'update_processing': update_processing,
0460                    'update_contents': []}
0461         return ret
0462 
0463     def process_new_processing(self, event=None, processing=None):
0464         self.number_workers += 1
0465         try:
0466             if processing is None and event:
0467                 # pr_status = [ProcessingStatus.New]
0468                 self.logger.info("process_new_processing, event: %s" % str(event))
0469                 pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
0470                 if not pr:
0471                     self.logger.warn("Cannot find processing for event: %s" % str(event))
0472                 elif self._new_processing_status and pr['status'] not in self._new_processing_status:
0473                     parameters = {'locking': ProcessingLocking.Idle}
0474                     update_processing = {'processing_id': pr['processing_id'],
0475                                          'parameters': parameters}
0476                     ret = {'update_processing': update_processing,
0477                            'update_contents': []}
0478                     self.update_processing(ret, pr)
0479                 else:
0480                     processing = pr
0481             if processing:
0482                 pr = processing
0483                 log_pre = self.get_log_prefix(pr)
0484                 self.logger.info(log_pre + "process_new_processing")
0485                 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0486                     ret = self.handle_new_iprocessing(pr)
0487                     # self.logger.info(log_pre + "process_new_processing result: %s" % str(ret))
0488 
0489                     self.update_processing(ret, pr)
0490 
0491                     self.logger.info(log_pre + "PreparedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0492                     event = PreparedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0493                     event.set_has_updates()
0494                     self.event_bus.send(event)
0495                 else:
0496                     ret = self.handle_new_processing(pr)
0497                     self.update_processing(ret, pr)
0498                     if processing['substatus'] in [ProcessingStatus.Prepared]:
0499                         self.logger.info(log_pre + "PreparedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0500                         event = PreparedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0501                         event.set_has_updates()
0502                         self.event_bus.send(event)
0503 
0504                         # get following processings
0505                         following_prs = core_processings.get_processings(
0506                             request_id=processing['request_id'],
0507                             parent_internal_ids=[processing['internal_id']],
0508                             loop_index=processing['loop_index']
0509                         )
0510                         if following_prs:
0511                             for following_pr in following_prs:
0512                                 self.logger.info(log_pre + "NewProcessingEvent(processing_id: %s)" % following_pr['processing_id'])
0513                                 event = NewProcessingEvent(publisher_id=self.id, processing_id=following_pr['processing_id'])
0514                                 self.event_bus.send(event)
0515 
0516         except Exception as ex:
0517             self.logger.error(ex)
0518             self.logger.error(traceback.format_exc())
0519         self.number_workers -= 1
0520 
0521     def process_prepared_processing(self, event=None, processing=None):
0522         self.number_workers += 1
0523         try:
0524             if processing is None and event:
0525                 # pr_status = [ProcessingStatus.New]
0526                 self.logger.info("process_prepared_processing, event: %s" % str(event))
0527                 pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
0528                 if not pr:
0529                     self.logger.warn("Cannot find processing for event: %s" % str(event))
0530                 elif self._prepared_processing_status and pr['status'] not in self._prepared_processing_status:
0531                     parameters = {'locking': ProcessingLocking.Idle}
0532                     update_processing = {'processing_id': pr['processing_id'],
0533                                          'parameters': parameters}
0534                     ret = {'update_processing': update_processing,
0535                            'update_contents': []}
0536                     self.update_processing(ret, pr)
0537                 else:
0538                     processing = pr
0539             if processing:
0540                 pr = processing
0541                 log_pre = self.get_log_prefix(pr)
0542                 self.logger.info(log_pre + "process_prepared_processing")
0543                 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0544                     # ret = self.handle_new_iprocessing(pr)
0545                     self.logger.error(log_pre + "process_prepared_processing, iWorkflow/iWork should not have status prepared")
0546                     parameters = {'locking': ProcessingLocking.Idle}
0547                     update_processing = {'processing_id': pr['processing_id'],
0548                                          'parameters': parameters}
0549                     ret = {'update_processing': update_processing,
0550                            'update_contents': []}
0551                     self.update_processing(ret, pr)
0552                 else:
0553                     ret = self.handle_prepared_processing(pr)
0554                     # self.logger.info(log_pre + "process_new_processing result: %s" % str(ret))
0555 
0556                     self.update_processing(ret, pr)
0557 
0558                     self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0559                     submit_event_content = {'event': 'submitted'}
0560                     event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=submit_event_content)
0561                     event.set_has_updates()
0562                     self.event_bus.send(event)
0563 
0564                     # self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id'])
0565                     # event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0566                     # event.set_has_updates()
0567                     # self.event_bus.send(event)
0568         except Exception as ex:
0569             self.logger.error(ex)
0570             self.logger.error(traceback.format_exc())
0571         self.number_workers -= 1
0572 
0573     def init_event_function_map(self):
0574         self.event_func_map = {
0575             EventType.NewProcessing: {
0576                 'pre_check': self.is_ok_to_run_more_processings,
0577                 'exec_func': self.process_new_processing
0578             },
0579             EventType.PreparedProcessing: {
0580                 'pre_check': self.is_ok_to_run_more_processings,
0581                 'exec_func': self.process_prepared_processing
0582             }
0583         }
0584 
0585     def run(self):
0586         """
0587         Main run function.
0588         """
0589         try:
0590             self.logger.info("Starting main thread")
0591             self.init_thread_info()
0592 
0593             self.load_plugins()
0594             self.init()
0595 
0596             self.add_default_tasks()
0597 
0598             self.init_event_function_map()
0599 
0600             task = self.create_task(task_func=self.get_new_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0601             self.add_task(task)
0602 
0603             task = self.create_task(task_func=self.get_prepared_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0604             self.add_task(task)
0605 
0606             task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0607             self.add_task(task)
0608 
0609             self.execute()
0610         except KeyboardInterrupt:
0611             self.stop()
0612 
0613 
0614 if __name__ == '__main__':
0615     agent = Submitter()
0616     agent()