File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import time
0012 import traceback
0013
0014 from idds.common import exceptions
0015 from idds.common.constants import (Sections, ReturnCode, ProcessingType,
0016 ProcessingStatus, ProcessingLocking,
0017 Terminated_processing_status)
0018 from idds.common.utils import setup_logging, truncate_string
0019 from idds.core import processings as core_processings
0020 from idds.agents.common.baseagent import BaseAgent
0021 from idds.agents.common.eventbus.event import (EventType,
0022 UpdateProcessingEvent,
0023 UpdateTransformEvent)
0024
0025
0026 from .utils import (handle_abort_processing,
0027 handle_resume_processing,
0028
0029 sync_processing)
0030 from .iutils import sync_iprocessing, handle_abort_iprocessing, handle_resume_iprocessing
0031 from .poller import Poller
0032
0033 setup_logging(__name__)
0034
0035
0036 class Finisher(Poller):
0037 """
0038 Finisher works to submit and running tasks to WFMS.
0039 """
0040
0041 def __init__(self, num_threads=1, finisher_max_number_workers=None, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2,
0042 use_process_pool=False, message_bulk_size=1000, **kwargs):
0043 if finisher_max_number_workers:
0044 self.max_number_workers = int(finisher_max_number_workers)
0045 else:
0046 self.max_number_workers = int(max_number_workers)
0047 self.set_max_workers()
0048
0049 num_threads = int(self.max_number_workers)
0050
0051 super(Finisher, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers,
0052 name='Finisher', use_process_pool=use_process_pool,
0053 poll_time_period=poll_time_period, retries=retries,
0054 retrieve_bulk_size=retrieve_bulk_size,
0055 message_bulk_size=message_bulk_size, **kwargs)
0056 self.logger.info("num_threads: %s" % num_threads)
0057
0058 self.config_section = Sections.Carrier
0059 self.poll_time_period = int(poll_time_period)
0060 self.retries = int(retries)
0061
0062 if hasattr(self, 'finisher_max_number_workers'):
0063 self.max_number_workers = int(self.finisher_max_number_workers)
0064
0065 self.show_queue_size_time = None
0066
0067 def show_queue_size(self):
0068 if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0069 self.show_queue_size_time = time.time()
0070 q_str = "number of processings: %s, max number of processings: %s" % (self.get_num_workers(), self.get_max_workers())
0071 self.logger.debug(q_str)
0072
0073 def get_finishing_processings(self):
0074 """
0075 Get finishing processing
0076 """
0077 try:
0078 if not self.is_ok_to_run_more_processings():
0079 return []
0080
0081 self.show_queue_size()
0082
0083 if BaseAgent.min_request_id is None:
0084 return []
0085
0086 processing_status = [ProcessingStatus.Terminating, ProcessingStatus.Synchronizing]
0087
0088 processings = core_processings.get_processings_by_status(status=processing_status,
0089 locking=True, update_poll=True,
0090 min_request_id=BaseAgent.min_request_id,
0091 bulk_size=self.retrieve_bulk_size)
0092
0093
0094 if processings:
0095 processing_ids = [pr['processing_id'] for pr in processings]
0096 self.logger.info("Main thread get terminating/synchronizing processings to process: %s" % (str(processing_ids)))
0097
0098 for pr in processings:
0099 pr_status = pr['status']
0100 if pr_status in [ProcessingStatus.Terminating]:
0101 self.submit(self.process_terminated_processing, **{"processing": pr})
0102 elif pr_status in [ProcessingStatus.Synchronizing]:
0103 self.submit(self.process_sync_processing, **{"processing": pr})
0104 except exceptions.DatabaseException as ex:
0105 if 'ORA-00060' in str(ex):
0106 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0107 else:
0108
0109 self.logger.error(ex)
0110 self.logger.error(traceback.format_exc())
0111 return []
0112
0113 def handle_sync_processing(self, processing, log_prefix=""):
0114 """
0115 process terminated processing
0116 """
0117 try:
0118 processing, update_collections, messages = sync_processing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0119
0120 update_processing = {'processing_id': processing['processing_id'],
0121 'parameters': {'status': processing['status'],
0122 'locking': ProcessingLocking.Idle}}
0123 ret = {'update_processing': update_processing,
0124 'update_collections': update_collections,
0125 'messages': messages}
0126 return ret
0127 except Exception as ex:
0128 self.logger.error(ex)
0129 self.logger.error(traceback.format_exc())
0130 error = {'sync_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0131 update_processing = {'processing_id': processing['processing_id'],
0132 'parameters': {'status': ProcessingStatus.Running,
0133 'locking': ProcessingLocking.Idle,
0134 'errors': processing['errors'] if processing['errors'] else {}}}
0135 update_processing['parameters']['errors'].update(error)
0136 ret = {'update_processing': update_processing}
0137 return ret
0138 return None
0139
0140 def handle_sync_iprocessing(self, processing, log_prefix=""):
0141 """
0142 process terminated processing
0143 """
0144 try:
0145 processing, update_collections, messages = sync_iprocessing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0146
0147 update_processing = {'processing_id': processing['processing_id'],
0148 'parameters': {'status': processing['status'],
0149 'locking': ProcessingLocking.Idle}}
0150 ret = {'update_processing': update_processing,
0151 'update_collections': update_collections,
0152 'messages': messages}
0153 return ret
0154 except Exception as ex:
0155 self.logger.error(ex)
0156 self.logger.error(traceback.format_exc())
0157 error = {'sync_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0158 update_processing = {'processing_id': processing['processing_id'],
0159 'parameters': {'status': ProcessingStatus.Running,
0160 'locking': ProcessingLocking.Idle,
0161 'errors': processing['errors'] if processing['errors'] else {}}}
0162 update_processing['parameters']['errors'].update(error)
0163 ret = {'update_processing': update_processing}
0164 return ret
0165 return None
0166
0167 def process_sync_processing(self, event=None, processing=None):
0168 self.number_workers += 1
0169 pro_ret = ReturnCode.Ok.value
0170 try:
0171 if processing is None and event:
0172 self.logger.info("process_sync_processing: event: %s" % event)
0173 pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0174 if not pr:
0175 self.logger.error("Cannot find processing for event: %s" % str(event))
0176
0177 pro_ret = ReturnCode.Ok.value
0178 elif pr['status'] in Terminated_processing_status:
0179 parameters = {'locking': ProcessingLocking.Idle}
0180 update_processing = {'processing_id': pr['processing_id'],
0181 'parameters': parameters}
0182 ret = {'update_processing': update_processing,
0183 'update_contents': []}
0184 self.update_processing(ret, pr, renew_updated_at=True)
0185 pro_ret = ReturnCode.Ok.value
0186 else:
0187 processing = pr
0188 if processing:
0189 pr = processing
0190 log_pre = self.get_log_prefix(pr)
0191
0192 self.logger.info(log_pre + "process_sync_processing")
0193 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0194 ret = self.handle_sync_iprocessing(pr, log_prefix=log_pre)
0195 else:
0196 ret = self.handle_sync_processing(pr, log_prefix=log_pre)
0197 ret_copy = {}
0198 for ret_key in ret:
0199 if ret_key != 'messages':
0200 ret_copy[ret_key] = ret[ret_key]
0201 self.logger.info(log_pre + "process_sync_processing result: %s" % str(ret_copy))
0202
0203 self.update_processing(ret, pr)
0204
0205
0206
0207
0208
0209 except Exception as ex:
0210 self.logger.error(ex)
0211 self.logger.error(traceback.format_exc())
0212 pro_ret = ReturnCode.Failed.value
0213 self.number_workers -= 1
0214 return pro_ret
0215
0216 def handle_terminated_processing(self, processing, log_prefix=""):
0217 """
0218 process terminated processing
0219 """
0220 try:
0221 processing, update_collections, messages = sync_processing(processing, self.agent_attributes, terminate=True, logger=self.logger, log_prefix=log_prefix)
0222
0223 update_processing = {'processing_id': processing['processing_id'],
0224 'parameters': {'status': processing['status'],
0225 'locking': ProcessingLocking.Idle}}
0226 ret = {'update_processing': update_processing,
0227 'update_collections': update_collections,
0228 'messages': messages}
0229
0230 return ret
0231 except Exception as ex:
0232 self.logger.error(ex)
0233 self.logger.error(traceback.format_exc())
0234 error = {'term_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0235 update_processing = {'processing_id': processing['processing_id'],
0236 'parameters': {'status': ProcessingStatus.Running,
0237 'locking': ProcessingLocking.Idle,
0238 'errors': processing['errors'] if processing['errors'] else {}}}
0239 update_processing['parameters']['errors'].update(error)
0240 ret = {'update_processing': update_processing}
0241 return ret
0242 return None
0243
0244 def handle_terminated_iprocessing(self, processing, log_prefix=""):
0245 """
0246 process terminated processing
0247 """
0248 try:
0249 processing, update_collections, messages = sync_iprocessing(processing, self.agent_attributes, terminate=True, logger=self.logger, log_prefix=log_prefix)
0250
0251 update_processing = {'processing_id': processing['processing_id'],
0252 'parameters': {'status': processing['status'],
0253 'locking': ProcessingLocking.Idle}}
0254 ret = {'update_processing': update_processing,
0255 'update_collections': update_collections,
0256 'messages': messages}
0257
0258 return ret
0259 except Exception as ex:
0260 self.logger.error(ex)
0261 self.logger.error(traceback.format_exc())
0262 error = {'term_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0263 update_processing = {'processing_id': processing['processing_id'],
0264 'parameters': {'status': ProcessingStatus.Running,
0265 'locking': ProcessingLocking.Idle,
0266 'errors': processing['errors'] if processing['errors'] else {}}}
0267 update_processing['parameters']['errors'].update(error)
0268 ret = {'update_processing': update_processing}
0269 return ret
0270 return None
0271
0272 def process_terminated_processing(self, event=None, processing=None):
0273 self.number_workers += 1
0274 pro_ret = ReturnCode.Ok.value
0275 try:
0276 if processing is None and event:
0277 if event._counter > 3:
0278 self.logger.warn("Event counter is bigger than 3, skip event: %s" % str(event))
0279 else:
0280 pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0281 if not pr:
0282 self.logger.error("Cannot find processing for event: %s" % str(event))
0283
0284 pro_ret = ReturnCode.Ok.value
0285 elif pr['status'] in Terminated_processing_status:
0286 parameters = {'locking': ProcessingLocking.Idle}
0287 update_processing = {'processing_id': pr['processing_id'],
0288 'parameters': parameters}
0289 ret = {'update_processing': update_processing,
0290 'update_contents': []}
0291 self.update_processing(ret, pr)
0292 pro_ret = ReturnCode.Ok.value
0293 else:
0294 processing = pr
0295 if processing:
0296 pr = processing
0297 log_pre = self.get_log_prefix(pr)
0298
0299 self.logger.info(log_pre + "process_terminated_processing")
0300 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0301 ret = self.handle_terminated_iprocessing(pr, log_prefix=log_pre)
0302 else:
0303 ret = self.handle_terminated_processing(pr, log_prefix=log_pre)
0304 ret_copy = {}
0305 for ret_key in ret:
0306 if ret_key != 'messages':
0307 ret_copy[ret_key] = ret[ret_key]
0308 self.logger.info(log_pre + "process_terminated_processing result: %s" % str(ret_copy))
0309
0310 self.update_processing(ret, pr)
0311 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0312 event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'])
0313 self.event_bus.send(event)
0314
0315 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0316 pass
0317 else:
0318 if pr['status'] not in [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished, ProcessingStatus.Broken]:
0319
0320 self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % pr['processing_id'])
0321 event = UpdateProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'],
0322 counter=event._counter + 1 if event else 0)
0323 event.set_terminating()
0324 self.event_bus.send(event)
0325 except Exception as ex:
0326 self.logger.error(ex)
0327 self.logger.error(traceback.format_exc())
0328 pro_ret = ReturnCode.Failed.value
0329 self.number_workers -= 1
0330 return pro_ret
0331
0332 def handle_abort_processing(self, processing, log_prefix=""):
0333 """
0334 process abort processing
0335 """
0336 try:
0337 processing, update_collections, update_contents, messages = handle_abort_processing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0338
0339 update_processing = {'processing_id': processing['processing_id'],
0340 'parameters': {'status': processing['status'],
0341 'locking': ProcessingLocking.Idle}}
0342 ret = {'update_processing': update_processing,
0343 'update_collections': update_collections,
0344 'update_contents': update_contents,
0345 'messages': messages
0346 }
0347 return ret
0348 except Exception as ex:
0349 self.logger.error(ex)
0350 self.logger.error(traceback.format_exc())
0351 error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0352 update_processing = {'processing_id': processing['processing_id'],
0353 'parameters': {'status': ProcessingStatus.ToCancel,
0354 'locking': ProcessingLocking.Idle,
0355 'errors': processing['errors'] if processing['errors'] else {}}}
0356 update_processing['parameters']['errors'].update(error)
0357 ret = {'update_processing': update_processing}
0358 return ret
0359 return None
0360
0361 def handle_abort_iprocessing(self, processing, log_prefix=""):
0362 """
0363 process abort processing
0364 """
0365 try:
0366 plugin = None
0367 if processing['processing_type']:
0368 plugin_name = processing['processing_type'].name.lower() + '_poller'
0369 plugin = self.get_plugin(plugin_name)
0370 else:
0371 raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])
0372
0373 processing_status, update_collections, update_contents, messages = handle_abort_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)
0374
0375 update_processing = {'processing_id': processing['processing_id'],
0376 'parameters': {'status': processing_status,
0377 'substatus': ProcessingStatus.ToCancel,
0378 'locking': ProcessingLocking.Idle}}
0379 ret = {'update_processing': update_processing,
0380 'update_collections': update_collections,
0381 'update_contents': update_contents,
0382 'messages': messages
0383 }
0384 return ret
0385 except Exception as ex:
0386 self.logger.error(ex)
0387 self.logger.error(traceback.format_exc())
0388 error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0389 update_processing = {'processing_id': processing['processing_id'],
0390 'parameters': {'status': ProcessingStatus.ToCancel,
0391 'locking': ProcessingLocking.Idle,
0392 'errors': processing['errors'] if processing['errors'] else {}}}
0393 update_processing['parameters']['errors'].update(error)
0394 ret = {'update_processing': update_processing}
0395 return ret
0396 return None
0397
0398 def process_abort_processing(self, event):
0399 self.number_workers += 1
0400 pro_ret = ReturnCode.Ok.value
0401 try:
0402 if event:
0403 processing_status = [ProcessingStatus.Finished, ProcessingStatus.Failed,
0404 ProcessingStatus.Lost, ProcessingStatus.Cancelled,
0405 ProcessingStatus.Suspended, ProcessingStatus.Expired,
0406 ProcessingStatus.Broken]
0407
0408 pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0409
0410 if not pr:
0411 self.logger.error("Cannot find processing for event: %s" % str(event))
0412 pro_ret = ReturnCode.Locked.value
0413 else:
0414 log_pre = self.get_log_prefix(pr)
0415 self.logger.info(log_pre + "process_abort_processing")
0416
0417 if pr and pr['status'] in processing_status:
0418 update_processing = {'processing_id': pr['processing_id'],
0419 'parameters': {'locking': ProcessingLocking.Idle,
0420 'errors': {'abort_err': {'msg': truncate_string("Processing is already terminated. Cannot be aborted", length=200)}}}}
0421 ret = {'update_processing': update_processing}
0422 self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))
0423 self.update_processing(ret, pr)
0424 elif pr:
0425 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0426 ret = self.handle_abort_iprocessing(pr, log_prefix=log_pre)
0427 self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))
0428
0429 self.update_processing(ret, pr, use_bulk_update_mappings=False)
0430
0431 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0432 event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0433 self.event_bus.send(event)
0434 else:
0435 ret = self.handle_abort_processing(pr, log_prefix=log_pre)
0436 ret_copy = {}
0437 for ret_key in ret:
0438 if ret_key != 'messages':
0439 ret_copy[ret_key] = ret[ret_key]
0440 self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy))
0441
0442 self.update_processing(ret, pr)
0443 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0444 event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0445 self.event_bus.send(event)
0446 except Exception as ex:
0447 self.logger.error(ex)
0448 self.logger.error(traceback.format_exc())
0449 pro_ret = ReturnCode.Failed.value
0450 self.number_workers -= 1
0451 return pro_ret
0452
0453 def handle_resume_processing(self, processing, log_prefix=""):
0454 """
0455 process resume processing
0456 """
0457 try:
0458 processing, update_collections, update_contents = handle_resume_processing(processing, self.agent_attributes, logger=self.logger, log_prefix=log_prefix)
0459
0460 update_processing = {'processing_id': processing['processing_id'],
0461 'parameters': {'status': processing['status'],
0462 'locking': ProcessingLocking.Idle}}
0463 ret = {'update_processing': update_processing,
0464 'update_collections': update_collections,
0465 'update_contents': update_contents,
0466 }
0467 return ret
0468 except Exception as ex:
0469 self.logger.error(ex)
0470 self.logger.error(traceback.format_exc())
0471 error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0472 update_processing = {'processing_id': processing['processing_id'],
0473 'parameters': {'status': ProcessingStatus.ToResume,
0474 'locking': ProcessingLocking.Idle,
0475 'errors': processing['errors'] if processing['errors'] else {}}}
0476 update_processing['parameters']['errors'].update(error)
0477 ret = {'update_processing': update_processing}
0478 return ret
0479 return None
0480
0481 def handle_resume_iprocessing(self, processing, log_prefix=""):
0482 """
0483 process resume processing
0484 """
0485 try:
0486 plugin = None
0487 if processing['processing_type']:
0488 plugin_name = processing['processing_type'].name.lower() + '_poller'
0489 plugin = self.get_plugin(plugin_name)
0490 else:
0491 raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])
0492
0493 processing_status, update_collections, update_contents = handle_resume_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)
0494
0495 update_processing = {'processing_id': processing['processing_id'],
0496 'parameters': {'status': processing_status,
0497 'substatus': ProcessingStatus.ToResume,
0498 'locking': ProcessingLocking.Idle}}
0499 ret = {'update_processing': update_processing,
0500 'update_collections': update_collections,
0501 'update_contents': update_contents,
0502 }
0503 return ret
0504 except Exception as ex:
0505 self.logger.error(ex)
0506 self.logger.error(traceback.format_exc())
0507 error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0508 update_processing = {'processing_id': processing['processing_id'],
0509 'parameters': {'status': ProcessingStatus.ToResume,
0510 'locking': ProcessingLocking.Idle,
0511 'errors': processing['errors'] if processing['errors'] else {}}}
0512 update_processing['parameters']['errors'].update(error)
0513 ret = {'update_processing': update_processing}
0514 return ret
0515 return None
0516
0517 def process_resume_processing(self, event):
0518 self.number_workers += 1
0519 pro_ret = ReturnCode.Ok.value
0520 try:
0521 if event:
0522 processing_status = [ProcessingStatus.Finished]
0523
0524 pr = self.get_processing(processing_id=event._processing_id, exclude_status=[ProcessingStatus.Prepared], locking=True)
0525
0526 if not pr:
0527 self.logger.error("Cannot find processing for event: %s" % str(event))
0528 pro_ret = ReturnCode.Locked.value
0529 else:
0530 log_pre = self.get_log_prefix(pr)
0531 self.logger.info(log_pre + "process_resume_processing")
0532
0533 if pr and pr['status'] in processing_status:
0534 update_processing = {'processing_id': pr['processing_id'],
0535 'parameters': {'locking': ProcessingLocking.Idle,
0536 'errors': {'abort_err': {'msg': truncate_string("Processing has already finished. Cannot be resumed", length=200)}}}}
0537 ret = {'update_processing': update_processing}
0538
0539 self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
0540
0541 self.update_processing(ret, pr)
0542 elif pr:
0543 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0544 ret = self.handle_resume_iprocessing(pr, log_prefix=log_pre)
0545 self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
0546
0547 self.update_processing(ret, pr, use_bulk_update_mappings=False)
0548
0549 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0550 event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0551 self.event_bus.send(event)
0552 else:
0553 ret = self.handle_resume_processing(pr, log_prefix=log_pre)
0554 self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
0555
0556 self.update_processing(ret, pr, use_bulk_update_mappings=False)
0557
0558 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0559 event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content if event else None)
0560 self.event_bus.send(event)
0561 except Exception as ex:
0562 self.logger.error(ex)
0563 self.logger.error(traceback.format_exc())
0564 pro_ret = ReturnCode.Failed.value
0565 self.number_workers -= 1
0566 return pro_ret
0567
0568 def init_event_function_map(self):
0569 self.event_func_map = {
0570 EventType.SyncProcessing: {
0571 'pre_check': self.is_ok_to_run_more_processings,
0572 'exec_func': self.process_sync_processing
0573 },
0574 EventType.TerminatedProcessing: {
0575 'pre_check': self.is_ok_to_run_more_processings,
0576 'exec_func': self.process_terminated_processing
0577 },
0578 EventType.AbortProcessing: {
0579 'pre_check': self.is_ok_to_run_more_processings,
0580 'exec_func': self.process_abort_processing
0581 },
0582 EventType.ResumeProcessing: {
0583 'pre_check': self.is_ok_to_run_more_processings,
0584 'exec_func': self.process_resume_processing
0585 }
0586 }
0587
0588 def run(self):
0589 """
0590 Main run function.
0591 """
0592 try:
0593 self.logger.info("Starting main thread")
0594 self.init_thread_info()
0595
0596 self.load_plugins()
0597 self.init()
0598
0599 self.add_default_tasks()
0600
0601 self.init_event_function_map()
0602
0603 task = self.create_task(task_func=self.get_finishing_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0604 self.add_task(task)
0605
0606 task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0607 self.add_task(task)
0608
0609 self.execute()
0610 except KeyboardInterrupt:
0611 self.stop()
0612
0613
0614 if __name__ == '__main__':
0615 agent = Finisher()
0616 agent()