Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 import time
0012 import traceback
0013 
0014 from idds.common import exceptions
0015 from idds.common.constants import (Sections, ReturnCode, ProcessingType,
0016                                    ProcessingStatus, ProcessingLocking,
0017                                    Terminated_processing_status)
0018 from idds.common.utils import setup_logging, truncate_string
0019 from idds.core import processings as core_processings
0020 from idds.agents.common.baseagent import BaseAgent
0021 from idds.agents.common.eventbus.event import (EventType,
0022                                                UpdateProcessingEvent,
0023                                                UpdateTransformEvent)
0024 
0025 
0026 from .utils import (handle_abort_processing,
0027                     handle_resume_processing,
0028                     # is_process_terminated,
0029                     sync_processing)
0030 from .iutils import sync_iprocessing, handle_abort_iprocessing, handle_resume_iprocessing
0031 from .poller import Poller
0032 
0033 setup_logging(__name__)
0034 
0035 
0036 class Finisher(Poller):
0037     """
0038     Finisher works to submit and running tasks to WFMS.
0039     """
0040 
0041     def __init__(self, num_threads=1, finisher_max_number_workers=None, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2,
0042                  use_process_pool=False, message_bulk_size=1000, **kwargs):
0043         if finisher_max_number_workers:
0044             self.max_number_workers = int(finisher_max_number_workers)
0045         else:
0046             self.max_number_workers = int(max_number_workers)
0047         self.set_max_workers()
0048 
0049         num_threads = int(self.max_number_workers)
0050 
0051         super(Finisher, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers,
0052                                        name='Finisher', use_process_pool=use_process_pool,
0053                                        poll_time_period=poll_time_period, retries=retries,
0054                                        retrieve_bulk_size=retrieve_bulk_size,
0055                                        message_bulk_size=message_bulk_size, **kwargs)
0056         self.logger.info("num_threads: %s" % num_threads)
0057 
0058         self.config_section = Sections.Carrier
0059         self.poll_time_period = int(poll_time_period)
0060         self.retries = int(retries)
0061 
0062         if hasattr(self, 'finisher_max_number_workers'):
0063             self.max_number_workers = int(self.finisher_max_number_workers)
0064 
0065         self.show_queue_size_time = None
0066 
0067     def show_queue_size(self):
0068         if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0069             self.show_queue_size_time = time.time()
0070             q_str = "number of processings: %s, max number of processings: %s" % (self.get_num_workers(), self.get_max_workers())
0071             self.logger.debug(q_str)
0072 
0073     def get_finishing_processings(self):
0074         """
0075         Get finishing processing
0076         """
0077         try:
0078             if not self.is_ok_to_run_more_processings():
0079                 return []
0080 
0081             self.show_queue_size()
0082 
0083             if BaseAgent.min_request_id is None:
0084                 return []
0085 
0086             processing_status = [ProcessingStatus.Terminating, ProcessingStatus.Synchronizing]
0087             # next_poll_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_period)
0088             processings = core_processings.get_processings_by_status(status=processing_status,
0089                                                                      locking=True, update_poll=True,
0090                                                                      min_request_id=BaseAgent.min_request_id,
0091                                                                      bulk_size=self.retrieve_bulk_size)
0092 
0093             # self.logger.debug("Main thread get %s [submitting + submitted + running] processings to process" % (len(processings)))
0094             if processings:
0095                 processing_ids = [pr['processing_id'] for pr in processings]
0096                 self.logger.info("Main thread get terminating/synchronizing processings to process: %s" % (str(processing_ids)))
0097 
0098             for pr in processings:
0099                 pr_status = pr['status']
0100                 if pr_status in [ProcessingStatus.Terminating]:
0101                     self.submit(self.process_terminated_processing, **{"processing": pr})
0102                 elif pr_status in [ProcessingStatus.Synchronizing]:
0103                     self.submit(self.process_sync_processing, **{"processing": pr})
0104         except exceptions.DatabaseException as ex:
0105             if 'ORA-00060' in str(ex):
0106                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0107             else:
0108                 # raise ex
0109                 self.logger.error(ex)
0110                 self.logger.error(traceback.format_exc())
0111         return []
0112 
0113     def handle_sync_processing(self, processing, log_prefix=""):
0114         """
0115         process terminated processing
0116         """
0117         try:
0118             processing, update_collections, messages = sync_processing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0119 
0120             update_processing = {'processing_id': processing['processing_id'],
0121                                  'parameters': {'status': processing['status'],
0122                                                 'locking': ProcessingLocking.Idle}}
0123             ret = {'update_processing': update_processing,
0124                    'update_collections': update_collections,
0125                    'messages': messages}
0126             return ret
0127         except Exception as ex:
0128             self.logger.error(ex)
0129             self.logger.error(traceback.format_exc())
0130             error = {'sync_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0131             update_processing = {'processing_id': processing['processing_id'],
0132                                  'parameters': {'status': ProcessingStatus.Running,
0133                                                 'locking': ProcessingLocking.Idle,
0134                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0135             update_processing['parameters']['errors'].update(error)
0136             ret = {'update_processing': update_processing}
0137             return ret
0138         return None
0139 
0140     def handle_sync_iprocessing(self, processing, log_prefix=""):
0141         """
0142         process terminated processing
0143         """
0144         try:
0145             processing, update_collections, messages = sync_iprocessing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0146 
0147             update_processing = {'processing_id': processing['processing_id'],
0148                                  'parameters': {'status': processing['status'],
0149                                                 'locking': ProcessingLocking.Idle}}
0150             ret = {'update_processing': update_processing,
0151                    'update_collections': update_collections,
0152                    'messages': messages}
0153             return ret
0154         except Exception as ex:
0155             self.logger.error(ex)
0156             self.logger.error(traceback.format_exc())
0157             error = {'sync_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0158             update_processing = {'processing_id': processing['processing_id'],
0159                                  'parameters': {'status': ProcessingStatus.Running,
0160                                                 'locking': ProcessingLocking.Idle,
0161                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0162             update_processing['parameters']['errors'].update(error)
0163             ret = {'update_processing': update_processing}
0164             return ret
0165         return None
0166 
0167     def process_sync_processing(self, event=None, processing=None):
0168         self.number_workers += 1
0169         pro_ret = ReturnCode.Ok.value
0170         try:
0171             if processing is None and event:
0172                 self.logger.info("process_sync_processing: event: %s" % event)
0173                 pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0174                 if not pr:
0175                     self.logger.error("Cannot find processing for event: %s" % str(event))
0176                     # pro_ret = ReturnCode.Locked.value
0177                     pro_ret = ReturnCode.Ok.value
0178                 elif pr['status'] in Terminated_processing_status:
0179                     parameters = {'locking': ProcessingLocking.Idle}
0180                     update_processing = {'processing_id': pr['processing_id'],
0181                                          'parameters': parameters}
0182                     ret = {'update_processing': update_processing,
0183                            'update_contents': []}
0184                     self.update_processing(ret, pr, renew_updated_at=True)
0185                     pro_ret = ReturnCode.Ok.value
0186                 else:
0187                     processing = pr
0188             if processing:
0189                 pr = processing
0190                 log_pre = self.get_log_prefix(pr)
0191 
0192                 self.logger.info(log_pre + "process_sync_processing")
0193                 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0194                     ret = self.handle_sync_iprocessing(pr, log_prefix=log_pre)
0195                 else:
0196                     ret = self.handle_sync_processing(pr, log_prefix=log_pre)
0197                 ret_copy = {}
0198                 for ret_key in ret:
0199                     if ret_key != 'messages':
0200                         ret_copy[ret_key] = ret[ret_key]
0201                 self.logger.info(log_pre + "process_sync_processing result: %s" % str(ret_copy))
0202 
0203                 self.update_processing(ret, pr)
0204 
0205                 # no need to update transform
0206                 # self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0207                 # event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'])
0208                 # self.event_bus.send(event)
0209         except Exception as ex:
0210             self.logger.error(ex)
0211             self.logger.error(traceback.format_exc())
0212             pro_ret = ReturnCode.Failed.value
0213         self.number_workers -= 1
0214         return pro_ret
0215 
0216     def handle_terminated_processing(self, processing, log_prefix=""):
0217         """
0218         process terminated processing
0219         """
0220         try:
0221             processing, update_collections, messages = sync_processing(processing, self.agent_attributes, terminate=True, logger=self.logger, log_prefix=log_prefix)
0222 
0223             update_processing = {'processing_id': processing['processing_id'],
0224                                  'parameters': {'status': processing['status'],
0225                                                 'locking': ProcessingLocking.Idle}}
0226             ret = {'update_processing': update_processing,
0227                    'update_collections': update_collections,
0228                    'messages': messages}
0229 
0230             return ret
0231         except Exception as ex:
0232             self.logger.error(ex)
0233             self.logger.error(traceback.format_exc())
0234             error = {'term_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0235             update_processing = {'processing_id': processing['processing_id'],
0236                                  'parameters': {'status': ProcessingStatus.Running,
0237                                                 'locking': ProcessingLocking.Idle,
0238                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0239             update_processing['parameters']['errors'].update(error)
0240             ret = {'update_processing': update_processing}
0241             return ret
0242         return None
0243 
0244     def handle_terminated_iprocessing(self, processing, log_prefix=""):
0245         """
0246         process terminated processing
0247         """
0248         try:
0249             processing, update_collections, messages = sync_iprocessing(processing, self.agent_attributes, terminate=True, logger=self.logger, log_prefix=log_prefix)
0250 
0251             update_processing = {'processing_id': processing['processing_id'],
0252                                  'parameters': {'status': processing['status'],
0253                                                 'locking': ProcessingLocking.Idle}}
0254             ret = {'update_processing': update_processing,
0255                    'update_collections': update_collections,
0256                    'messages': messages}
0257 
0258             return ret
0259         except Exception as ex:
0260             self.logger.error(ex)
0261             self.logger.error(traceback.format_exc())
0262             error = {'term_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0263             update_processing = {'processing_id': processing['processing_id'],
0264                                  'parameters': {'status': ProcessingStatus.Running,
0265                                                 'locking': ProcessingLocking.Idle,
0266                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0267             update_processing['parameters']['errors'].update(error)
0268             ret = {'update_processing': update_processing}
0269             return ret
0270         return None
0271 
0272     def process_terminated_processing(self, event=None, processing=None):
0273         self.number_workers += 1
0274         pro_ret = ReturnCode.Ok.value
0275         try:
0276             if processing is None and event:
0277                 if event._counter > 3:
0278                     self.logger.warn("Event counter is bigger than 3, skip event: %s" % str(event))
0279                 else:
0280                     pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0281                     if not pr:
0282                         self.logger.error("Cannot find processing for event: %s" % str(event))
0283                         # pro_ret = ReturnCode.Locked.value
0284                         pro_ret = ReturnCode.Ok.value
0285                     elif pr['status'] in Terminated_processing_status:
0286                         parameters = {'locking': ProcessingLocking.Idle}
0287                         update_processing = {'processing_id': pr['processing_id'],
0288                                              'parameters': parameters}
0289                         ret = {'update_processing': update_processing,
0290                                'update_contents': []}
0291                         self.update_processing(ret, pr)
0292                         pro_ret = ReturnCode.Ok.value
0293                     else:
0294                         processing = pr
0295             if processing:
0296                 pr = processing
0297                 log_pre = self.get_log_prefix(pr)
0298 
0299                 self.logger.info(log_pre + "process_terminated_processing")
0300                 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0301                     ret = self.handle_terminated_iprocessing(pr, log_prefix=log_pre)
0302                 else:
0303                     ret = self.handle_terminated_processing(pr, log_prefix=log_pre)
0304                 ret_copy = {}
0305                 for ret_key in ret:
0306                     if ret_key != 'messages':
0307                         ret_copy[ret_key] = ret[ret_key]
0308                 self.logger.info(log_pre + "process_terminated_processing result: %s" % str(ret_copy))
0309 
0310                 self.update_processing(ret, pr)
0311                 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0312                 event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'])
0313                 self.event_bus.send(event)
0314 
0315                 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0316                     pass
0317                 else:
0318                     if pr['status'] not in [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished, ProcessingStatus.Broken]:
0319                         # some files are missing, poll it.
0320                         self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % pr['processing_id'])
0321                         event = UpdateProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'],
0322                                                       counter=event._counter + 1 if event else 0)
0323                         event.set_terminating()
0324                         self.event_bus.send(event)
0325         except Exception as ex:
0326             self.logger.error(ex)
0327             self.logger.error(traceback.format_exc())
0328             pro_ret = ReturnCode.Failed.value
0329         self.number_workers -= 1
0330         return pro_ret
0331 
0332     def handle_abort_processing(self, processing, log_prefix=""):
0333         """
0334         process abort processing
0335         """
0336         try:
0337             processing, update_collections, update_contents, messages = handle_abort_processing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0338 
0339             update_processing = {'processing_id': processing['processing_id'],
0340                                  'parameters': {'status': processing['status'],
0341                                                 'locking': ProcessingLocking.Idle}}
0342             ret = {'update_processing': update_processing,
0343                    'update_collections': update_collections,
0344                    'update_contents': update_contents,
0345                    'messages': messages
0346                    }
0347             return ret
0348         except Exception as ex:
0349             self.logger.error(ex)
0350             self.logger.error(traceback.format_exc())
0351             error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0352             update_processing = {'processing_id': processing['processing_id'],
0353                                  'parameters': {'status': ProcessingStatus.ToCancel,
0354                                                 'locking': ProcessingLocking.Idle,
0355                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0356             update_processing['parameters']['errors'].update(error)
0357             ret = {'update_processing': update_processing}
0358             return ret
0359         return None
0360 
0361     def handle_abort_iprocessing(self, processing, log_prefix=""):
0362         """
0363         process abort processing
0364         """
0365         try:
0366             plugin = None
0367             if processing['processing_type']:
0368                 plugin_name = processing['processing_type'].name.lower() + '_poller'
0369                 plugin = self.get_plugin(plugin_name)
0370             else:
0371                 raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])
0372 
0373             processing_status, update_collections, update_contents, messages = handle_abort_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)
0374 
0375             update_processing = {'processing_id': processing['processing_id'],
0376                                  'parameters': {'status': processing_status,
0377                                                 'substatus': ProcessingStatus.ToCancel,
0378                                                 'locking': ProcessingLocking.Idle}}
0379             ret = {'update_processing': update_processing,
0380                    'update_collections': update_collections,
0381                    'update_contents': update_contents,
0382                    'messages': messages
0383                    }
0384             return ret
0385         except Exception as ex:
0386             self.logger.error(ex)
0387             self.logger.error(traceback.format_exc())
0388             error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0389             update_processing = {'processing_id': processing['processing_id'],
0390                                  'parameters': {'status': ProcessingStatus.ToCancel,
0391                                                 'locking': ProcessingLocking.Idle,
0392                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0393             update_processing['parameters']['errors'].update(error)
0394             ret = {'update_processing': update_processing}
0395             return ret
0396         return None
0397 
0398     def process_abort_processing(self, event):
0399         self.number_workers += 1
0400         pro_ret = ReturnCode.Ok.value
0401         try:
0402             if event:
0403                 processing_status = [ProcessingStatus.Finished, ProcessingStatus.Failed,
0404                                      ProcessingStatus.Lost, ProcessingStatus.Cancelled,
0405                                      ProcessingStatus.Suspended, ProcessingStatus.Expired,
0406                                      ProcessingStatus.Broken]
0407 
0408                 pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0409 
0410                 if not pr:
0411                     self.logger.error("Cannot find processing for event: %s" % str(event))
0412                     pro_ret = ReturnCode.Locked.value
0413                 else:
0414                     log_pre = self.get_log_prefix(pr)
0415                     self.logger.info(log_pre + "process_abort_processing")
0416 
0417                     if pr and pr['status'] in processing_status:
0418                         update_processing = {'processing_id': pr['processing_id'],
0419                                              'parameters': {'locking': ProcessingLocking.Idle,
0420                                                             'errors': {'abort_err': {'msg': truncate_string("Processing is already terminated. Cannot be aborted", length=200)}}}}
0421                         ret = {'update_processing': update_processing}
0422                         self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))
0423                         self.update_processing(ret, pr)
0424                     elif pr:
0425                         if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0426                             ret = self.handle_abort_iprocessing(pr, log_prefix=log_pre)
0427                             self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))
0428 
0429                             self.update_processing(ret, pr, use_bulk_update_mappings=False)
0430 
0431                             self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0432                             event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0433                             self.event_bus.send(event)
0434                         else:
0435                             ret = self.handle_abort_processing(pr, log_prefix=log_pre)
0436                             ret_copy = {}
0437                             for ret_key in ret:
0438                                 if ret_key != 'messages':
0439                                     ret_copy[ret_key] = ret[ret_key]
0440                             self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy))
0441 
0442                             self.update_processing(ret, pr)
0443                             self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0444                             event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0445                             self.event_bus.send(event)
0446         except Exception as ex:
0447             self.logger.error(ex)
0448             self.logger.error(traceback.format_exc())
0449             pro_ret = ReturnCode.Failed.value
0450         self.number_workers -= 1
0451         return pro_ret
0452 
0453     def handle_resume_processing(self, processing, log_prefix=""):
0454         """
0455         process resume processing
0456         """
0457         try:
0458             processing, update_collections, update_contents = handle_resume_processing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0459 
0460             update_processing = {'processing_id': processing['processing_id'],
0461                                  'parameters': {'status': processing['status'],
0462                                                 'locking': ProcessingLocking.Idle}}
0463             ret = {'update_processing': update_processing,
0464                    'update_collections': update_collections,
0465                    'update_contents': update_contents,
0466                    }
0467             return ret
0468         except Exception as ex:
0469             self.logger.error(ex)
0470             self.logger.error(traceback.format_exc())
0471             error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0472             update_processing = {'processing_id': processing['processing_id'],
0473                                  'parameters': {'status': ProcessingStatus.ToResume,
0474                                                 'locking': ProcessingLocking.Idle,
0475                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0476             update_processing['parameters']['errors'].update(error)
0477             ret = {'update_processing': update_processing}
0478             return ret
0479         return None
0480 
0481     def handle_resume_iprocessing(self, processing, log_prefix=""):
0482         """
0483         process resume processing
0484         """
0485         try:
0486             plugin = None
0487             if processing['processing_type']:
0488                 plugin_name = processing['processing_type'].name.lower() + '_poller'
0489                 plugin = self.get_plugin(plugin_name)
0490             else:
0491                 raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])
0492 
0493             processing_status, update_collections, update_contents = handle_resume_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)
0494 
0495             update_processing = {'processing_id': processing['processing_id'],
0496                                  'parameters': {'status': processing_status,
0497                                                 'substatus': ProcessingStatus.ToResume,
0498                                                 'locking': ProcessingLocking.Idle}}
0499             ret = {'update_processing': update_processing,
0500                    'update_collections': update_collections,
0501                    'update_contents': update_contents,
0502                    }
0503             return ret
0504         except Exception as ex:
0505             self.logger.error(ex)
0506             self.logger.error(traceback.format_exc())
0507             error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0508             update_processing = {'processing_id': processing['processing_id'],
0509                                  'parameters': {'status': ProcessingStatus.ToResume,
0510                                                 'locking': ProcessingLocking.Idle,
0511                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0512             update_processing['parameters']['errors'].update(error)
0513             ret = {'update_processing': update_processing}
0514             return ret
0515         return None
0516 
0517     def process_resume_processing(self, event):
0518         self.number_workers += 1
0519         pro_ret = ReturnCode.Ok.value
0520         try:
0521             if event:
0522                 processing_status = [ProcessingStatus.Finished]
0523 
0524                 pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0525 
0526                 if not pr:
0527                     self.logger.error("Cannot find processing for event: %s" % str(event))
0528                     pro_ret = ReturnCode.Locked.value
0529                 else:
0530                     log_pre = self.get_log_prefix(pr)
0531                     self.logger.info(log_pre + "process_resume_processing")
0532 
0533                     if pr and pr['status'] in processing_status:
0534                         update_processing = {'processing_id': pr['processing_id'],
0535                                              'parameters': {'locking': ProcessingLocking.Idle,
0536                                                             'errors': {'abort_err': {'msg': truncate_string("Processing has already finished. Cannot be resumed", length=200)}}}}
0537                         ret = {'update_processing': update_processing}
0538 
0539                         self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
0540 
0541                         self.update_processing(ret, pr)
0542                     elif pr:
0543                         if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0544                             ret = self.handle_resume_iprocessing(pr, log_prefix=log_pre)
0545                             self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
0546 
0547                             self.update_processing(ret, pr, use_bulk_update_mappings=False)
0548 
0549                             self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0550                             event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0551                             self.event_bus.send(event)
0552                         else:
0553                             ret = self.handle_resume_processing(pr, log_prefix=log_pre)
0554                             self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
0555 
0556                             self.update_processing(ret, pr, use_bulk_update_mappings=False)
0557 
0558                             self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0559                             event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0560                             self.event_bus.send(event)
0561         except Exception as ex:
0562             self.logger.error(ex)
0563             self.logger.error(traceback.format_exc())
0564             pro_ret = ReturnCode.Failed.value
0565         self.number_workers -= 1
0566         return pro_ret
0567 
0568     def init_event_function_map(self):
0569         self.event_func_map = {
0570             EventType.SyncProcessing: {
0571                 'pre_check': self.is_ok_to_run_more_processings,
0572                 'exec_func': self.process_sync_processing
0573             },
0574             EventType.TerminatedProcessing: {
0575                 'pre_check': self.is_ok_to_run_more_processings,
0576                 'exec_func': self.process_terminated_processing
0577             },
0578             EventType.AbortProcessing: {
0579                 'pre_check': self.is_ok_to_run_more_processings,
0580                 'exec_func': self.process_abort_processing
0581             },
0582             EventType.ResumeProcessing: {
0583                 'pre_check': self.is_ok_to_run_more_processings,
0584                 'exec_func': self.process_resume_processing
0585             }
0586         }
0587 
0588     def run(self):
0589         """
0590         Main run function.
0591         """
0592         try:
0593             self.logger.info("Starting main thread")
0594             self.init_thread_info()
0595 
0596             self.load_plugins()
0597             self.init()
0598 
0599             self.add_default_tasks()
0600 
0601             self.init_event_function_map()
0602 
0603             task = self.create_task(task_func=self.get_finishing_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0604             self.add_task(task)
0605 
0606             task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0607             self.add_task(task)
0608 
0609             self.execute()
0610         except KeyboardInterrupt:
0611             self.stop()
0612 
0613 
0614 if __name__ == '__main__':
0615     agent = Finisher()
0616     agent()