Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 import datetime
0012 import random
0013 import time
0014 import traceback
0015 
0016 from idds.common import exceptions
0017 from idds.common.constants import (Sections, ReturnCode, ProcessingType,
0018                                    ProcessingStatus, ProcessingLocking,
0019                                    Terminated_processing_status)
0020 from idds.common.utils import setup_logging, truncate_string, json_dumps
0021 from idds.core import processings as core_processings
0022 from idds.agents.common.baseagent import BaseAgent
0023 from idds.agents.common.eventbus.event import (EventType,
0024                                                # UpdateProcessingEvent,
0025                                                TriggerProcessingEvent,
0026                                                SyncProcessingEvent,
0027                                                TerminatedProcessingEvent)
0028 
0029 from .utils import handle_update_processing_new, is_process_terminated, is_process_finished
0030 from .iutils import handle_update_iprocessing
0031 
0032 setup_logging(__name__)
0033 
0034 
0035 class Poller(BaseAgent):
0036     """
0037     Poller works to submit and running tasks to WFMS.
0038     """
0039 
0040     def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
0041                  max_updates_per_round=2000, name='Poller', message_bulk_size=1000, locking_period=1800,
0042                  use_process_pool=False, poller_max_number_workers=None, **kwargs):
0043         if poller_max_number_workers:
0044             self.max_number_workers = int(poller_max_number_workers)
0045         else:
0046             self.max_number_workers = int(max_number_workers)
0047         if int(num_threads) < int(self.max_number_workers):
0048             num_threads = int(self.max_number_workers)
0049 
0050         self.set_max_workers()
0051 
0052         super(Poller, self).__init__(num_threads=num_threads, name=name, use_process_pool=False, **kwargs)
0053         self.config_section = Sections.Carrier
0054         self.poll_period = int(poll_period)
0055         self.locking_period = int(locking_period)
0056         self.retries = int(retries)
0057         self.retrieve_bulk_size = int(retrieve_bulk_size)
0058         self.message_bulk_size = int(message_bulk_size)
0059 
0060         if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0061             self.new_poll_period = 10
0062         else:
0063             self.new_poll_period = int(self.new_poll_period)
0064         if not hasattr(self, 'new_fail_poll_period') or not self.new_fail_poll_period:
0065             self.new_fail_poll_period = 120
0066         else:
0067             self.new_fail_poll_period = int(self.new_fail_poll_period)
0068         if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0069             self.update_poll_period = self.poll_period
0070         else:
0071             self.update_poll_period = int(self.update_poll_period)
0072 
0073         if not hasattr(self, 'update_poll_period_for_new_task') or not self.update_poll_period_for_new_task:
0074             self.update_poll_period_for_new_task = 180
0075         else:
0076             self.update_poll_period_for_new_task = int(self.update_poll_period_for_new_task)
0077 
0078         if hasattr(self, 'poll_period_increase_rate'):
0079             self.poll_period_increase_rate = float(self.poll_period_increase_rate)
0080         else:
0081             self.poll_period_increase_rate = 2
0082 
0083         if hasattr(self, 'max_new_poll_period'):
0084             self.max_new_poll_period = int(self.max_new_poll_period)
0085         else:
0086             self.max_new_poll_period = 3600 * 6
0087         if hasattr(self, 'max_update_poll_period'):
0088             self.max_update_poll_period = int(self.max_update_poll_period)
0089         else:
0090             self.max_update_poll_period = 3600 * 6
0091 
0092         self.number_workers = 0
0093         if not hasattr(self, 'max_number_workers') or not self.max_number_workers:
0094             self.max_number_workers = 3
0095         else:
0096             self.max_number_workers = int(self.max_number_workers)
0097 
0098         self.max_updates_per_round = int(max_updates_per_round)
0099         self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round)
0100 
0101         if not hasattr(self, 'enable_executors') or not self.enable_executors:
0102             self.enable_executors = False
0103         else:
0104             if str(self.enable_executors).lower() == 'true':
0105                 self.enable_executors = True
0106             else:
0107                 self.enable_executors = False
0108         self.logger.info("enable_executors: %s" % self.enable_executors)
0109 
0110         self.show_queue_size_time = None
0111 
0112         self.extra_executors = None
0113 
0114         self._running_processing_status = None
0115 
0116         if hasattr(self, 'clean_locks_time_period'):
0117             self.clean_locks_time_period = int(self.clean_locks_time_period)
0118         else:
0119             self.clean_locks_time_period = 1800
0120 
0121     def get_extra_executors(self):
0122         if self.enable_executors:
0123             if self.extra_executors is None:
0124                 name = self.executor_name + "_Extra"
0125                 self.extra_executors = self.create_executors(name, max_workers=self.num_threads)
0126         return self.extra_executors
0127 
0128     def is_ok_to_run_more_processings(self):
0129         if self.get_num_free_workers() > 0:
0130             return True
0131         return False
0132 
0133     def get_bulk_size(self):
0134         return min(self.retrieve_bulk_size, self.get_num_free_workers())
0135 
0136     def show_queue_size(self):
0137         if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0138             self.show_queue_size_time = time.time()
0139 
0140             exec_max_workers = self.executors.get_max_workers()
0141             exec_num_workers = self.executors.get_num_workers()
0142             q_str = "number of processings: %s, max number of processings: %s" % (exec_num_workers, exec_max_workers)
0143             self.logger.debug(q_str)
0144 
0145     def init(self):
0146         try:
0147             status = [ProcessingStatus.New, ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0148                       ProcessingStatus.Running, ProcessingStatus.FinishedOnExec]
0149             core_processings.clean_next_poll_at(status)
0150         except Exception as ex:
0151             self.logger.info(f"Failed clean next_poll_at: {ex}")
0152 
0153     def get_running_processings(self):
0154         """
0155         Get running processing
0156         """
0157         try:
0158             if not self.is_ok_to_run_more_processings():
0159                 return []
0160 
0161             self.show_queue_size()
0162 
0163             if BaseAgent.min_request_id is None:
0164                 return []
0165 
0166             processing_status = [ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0167                                  ProcessingStatus.Running, ProcessingStatus.FinishedOnExec,
0168                                  ProcessingStatus.ToCancel, ProcessingStatus.Cancelling,
0169                                  ProcessingStatus.ToSuspend, ProcessingStatus.Suspending,
0170                                  ProcessingStatus.ToResume, ProcessingStatus.Resuming,
0171                                  ProcessingStatus.ToExpire, ProcessingStatus.Expiring,
0172                                  ProcessingStatus.ToFinish, ProcessingStatus.ToForceFinish]
0173             self._running_processing_status = processing_status
0174 
0175             # next_poll_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_period)
0176             processings = core_processings.get_processings_by_status(status=processing_status,
0177                                                                      locking=True, update_poll=True,
0178                                                                      min_request_id=BaseAgent.min_request_id,
0179                                                                      bulk_size=self.get_bulk_size())
0180 
0181             # self.logger.debug("Main thread get %s [submitting + submitted + running] processings to process" % (len(processings)))
0182             if processings:
0183                 processing_ids = [pr['processing_id'] for pr in processings]
0184                 self.logger.info("Main thread get [submitting + submitted + running] processings to process: %s" % (str(processing_ids)))
0185 
0186             for pr in processings:
0187                 # pr_id = pr['processing_id']
0188                 self.submit(self.process_update_processing, **{"processing": pr})
0189 
0190             return processings
0191         except exceptions.DatabaseException as ex:
0192             if 'ORA-00060' in str(ex):
0193                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0194             else:
0195                 # raise ex
0196                 self.logger.error(ex)
0197                 self.logger.error(traceback.format_exc())
0198         return []
0199 
0200     def get_processing(self, processing_id, status=None, exclude_status=None, locking=False):
0201         try:
0202             return core_processings.get_processing_by_id_status(processing_id=processing_id,
0203                                                                 status=status,
0204                                                                 exclude_status=exclude_status,
0205                                                                 locking=locking,
0206                                                                 to_lock=True,
0207                                                                 lock_period=self.locking_period)
0208         except exceptions.DatabaseException as ex:
0209             if 'ORA-00060' in str(ex):
0210                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0211             else:
0212                 # raise ex
0213                 self.logger.error(ex)
0214                 self.logger.error(traceback.format_exc())
0215         return None
0216 
0217     def get_work_tag_attribute(self, work_tag, attribute):
0218         work_tag_attribute_value = None
0219         if work_tag:
0220             work_tag_attribute = work_tag + "_" + attribute
0221             if hasattr(self, work_tag_attribute):
0222                 work_tag_attribute_value = int(getattr(self, work_tag_attribute))
0223         return work_tag_attribute_value
0224 
0225     def load_poll_period(self, processing, parameters, new=False):
0226         if 'processing' in processing['processing_metadata']:
0227             proc = processing['processing_metadata']['processing']
0228             work = proc.work
0229         else:
0230             work = processing['processing_metadata']['work']
0231 
0232         work_tag = work.get_work_tag()
0233 
0234         work_tag_new_poll_period = self.get_work_tag_attribute(work_tag, "new_poll_period")
0235         if work_tag_new_poll_period:
0236             parameters['new_poll_period'] = work_tag_new_poll_period
0237         elif self.new_poll_period and processing['new_poll_period'] != self.new_poll_period:
0238             parameters['new_poll_period'] = self.new_poll_period
0239 
0240         if new:
0241             work_tag_update_poll_period_for_new = self.get_work_tag_attribute(work_tag, "update_poll_period_for_new_task")
0242             if work_tag_update_poll_period_for_new:
0243                 parameters['update_poll_period'] = work_tag_update_poll_period_for_new
0244             elif self.update_poll_period_for_new_task and processing['update_poll_period'] != self.update_poll_period_for_new_task:
0245                 parameters['update_poll_period'] = self.update_poll_period_for_new_task
0246         else:
0247             work_tag_update_poll_period = self.get_work_tag_attribute(work_tag, "update_poll_period")
0248             if work_tag_update_poll_period:
0249                 parameters['update_poll_period'] = work_tag_update_poll_period
0250             elif self.update_poll_period and processing['update_poll_period'] != self.update_poll_period:
0251                 parameters['update_poll_period'] = self.update_poll_period
0252         return parameters
0253 
0254     def get_log_prefix(self, processing):
0255         return "<request_id=%s,transform_id=%s,processing_id=%s>" % (processing['request_id'],
0256                                                                      processing['transform_id'],
0257                                                                      processing['processing_id'])
0258 
0259     def update_processing(self, processing, processing_model, use_bulk_update_mappings=True, renew_updated_at=False):
0260         try:
0261             if processing:
0262                 log_prefix = self.get_log_prefix(processing_model)
0263 
0264                 self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters']))
0265 
0266                 processing['update_processing']['parameters']['locking'] = ProcessingLocking.Idle
0267                 # self.logger.debug("wen: %s" % str(processing))
0268                 if renew_updated_at:
0269                     processing['update_processing']['parameters']['updated_at'] = datetime.datetime.utcnow()
0270                 # check update_processing status
0271                 if 'status' in processing['update_processing']['parameters']:
0272                     new_status = processing['update_processing']['parameters']['status']
0273                     if new_status == ProcessingStatus.Submitting and processing_model['status'].value > ProcessingStatus.Submitting.value:
0274                         processing['update_processing']['parameters']['status'] = ProcessingStatus.Submitted
0275 
0276                 self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters']))
0277 
0278                 retry = True
0279                 retry_num = 0
0280                 while retry:
0281                     retry = False
0282                     retry_num += 1
0283                     try:
0284                         core_processings.update_processing_contents(update_processing=processing.get('update_processing', None),
0285                                                                     request_id=processing_model['request_id'],
0286                                                                     update_collections=processing.get('update_collections', None),
0287                                                                     update_contents=processing.get('update_contents', None),
0288                                                                     update_dep_contents=processing.get('update_dep_contents', None),
0289                                                                     messages=processing.get('messages', None),
0290                                                                     update_messages=processing.get('update_messages', None),
0291                                                                     new_contents=processing.get('new_contents', None),
0292                                                                     new_update_contents=processing.get('new_update_contents', None),
0293                                                                     new_contents_ext=processing.get('new_contents_ext', None),
0294                                                                     update_contents_ext=processing.get('update_contents_ext', None),
0295                                                                     new_input_dependency_contents=processing.get('new_input_dependency_contents', None),
0296                                                                     use_bulk_update_mappings=use_bulk_update_mappings,
0297                                                                     message_bulk_size=self.message_bulk_size)
0298                     except exceptions.DatabaseException as ex:
0299                         if 'ORA-00060' in str(ex):
0300                             self.logger.warn(log_prefix + "update_processing (cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0301                         else:
0302                             self.logger.error(ex)
0303                             self.logger.error(traceback.format_exc())
0304                         if retry_num < 5:
0305                             retry = True
0306                             if retry_num <= 1:
0307                                 random_sleep = random.randint(1, 10)
0308                             elif retry_num <= 2:
0309                                 random_sleep = random.randint(1, 60)
0310                             else:
0311                                 random_sleep = random.randint(1, 120)
0312                             self.logger.error(f"{log_prefix} retry_num: {retry_num} random_sleep: {random_sleep}")
0313                             time.sleep(random_sleep)
0314                         else:
0315                             raise ex
0316         except Exception as ex:
0317             self.logger.error(ex)
0318             self.logger.error(traceback.format_exc())
0319             self.logger.warn("Failed to update_processings: %s" % json_dumps(processing))
0320             try:
0321                 processing_id = processing['update_processing']['processing_id']
0322 
0323                 parameters = {'status': processing['update_processing']['parameters']['status'],
0324                               'locking': ProcessingLocking.Idle}
0325                 if 'new_retries' in processing['update_processing']['parameters']:
0326                     parameters['new_retries'] = processing['update_processing']['parameters']['new_retries']
0327                 if 'update_retries' in processing['update_processing']['parameters']:
0328                     parameters['update_retries'] = processing['update_processing']['parameters']['update_retries']
0329                 if 'errors' in processing['update_processing']['parameters']:
0330                     parameters['errors'] = processing['update_processing']['parameters']['errors']
0331 
0332                 self.logger.warn(log_prefix + "update_processing exception result: %s" % (parameters))
0333                 core_processings.update_processing(processing_id=processing_id, parameters=parameters)
0334             except Exception as ex:
0335                 self.logger.error(ex)
0336                 self.logger.error(traceback.format_exc())
0337 
0338     def handle_update_processing(self, processing):
0339         try:
0340             log_prefix = self.get_log_prefix(processing)
0341             executors = None
0342             if self.enable_executors:
0343                 executors = self.get_extra_executors()
0344 
0345             ret_handle_update_processing = handle_update_processing_new(processing,
0346                                                                         self.agent_attributes,
0347                                                                         max_updates_per_round=self.max_updates_per_round,
0348                                                                         max_jobs_per_round=self.max_updates_per_round,
0349                                                                         executors=executors,
0350                                                                         logger=self.logger,
0351                                                                         log_prefix=log_prefix)
0352 
0353             process_status, new_contents, new_input_dependency_contents, ret_msgs, update_contents, parameters, new_contents_ext, update_contents_ext = ret_handle_update_processing
0354 
0355             coll_metadata = parameters.pop("coll_metadata", None)
0356 
0357             proc = processing['processing_metadata']['processing']
0358             work = proc.work
0359 
0360             update_collections = []
0361             if work.is_data_work():
0362                 input_collections = work.get_input_collections(poll_externel=True)
0363                 output_collections = work.get_output_collections()
0364                 for output_coll in output_collections:
0365                     if not output_coll.coll_metadata:
0366                         output_coll.coll_metadata = {}
0367                     for k, v in coll_metadata:
0368                         output_coll.coll_metadata[k] = v
0369                 for coll in input_collections:
0370                     u_coll = {
0371                         'coll_id': coll.coll_id,
0372                         'total_files': coll.get("total_files", 0),
0373                         'processed_files': coll.get("availability", 0),
0374                         'processing_files': 0,
0375                         'activated_files': 0,
0376                         'preprocessing_files': 0,
0377                         'new_files': 0,
0378                         'failed_files': 0,
0379                         'missing_files': 0,
0380                         'bytes': coll.get("bytes", 0),
0381                         'ext_files': 0,
0382                         'processed_ext_files': 0,
0383                         'failed_ext_files': 0,
0384                         'missing_ext_files': 0,
0385                         'coll_metadata': coll.coll_metadata,
0386                     }
0387                     update_collections.append(u_coll)
0388                 for coll in output_collections:
0389                     u_coll = {
0390                         'coll_id': coll.coll_id,
0391                         'total_files': coll.get("total_files", 0),
0392                         'processed_files': coll.get("availability", 0),
0393                         'processing_files': coll.get("processing", 0),
0394                         'activated_files': 0,
0395                         'preprocessing_files': 0,
0396                         'new_files': 0,
0397                         'failed_files': coll.get("stuck", 0),
0398                         'missing_files': 0,
0399                         'bytes': 0,
0400                         'coll_metadata': coll.coll_metadata,
0401                     }
0402                     update_collections.append(u_coll)
0403 
0404             if work.use_dependency_to_release_jobs():
0405                 new_process_status = ProcessingStatus.Triggering
0406             else:
0407                 new_process_status = process_status
0408                 if is_process_terminated(process_status):
0409                     new_process_status = ProcessingStatus.Terminating
0410                     if is_process_finished(process_status):
0411                         new_process_status = ProcessingStatus.Terminating
0412                     else:
0413                         retries = processing['update_retries'] + 1
0414                         if processing['max_update_retries'] and retries < processing['max_update_retries']:
0415                             work.reactivate_processing(processing, log_prefix=log_prefix)
0416                             process_status = ProcessingStatus.Running
0417                             new_process_status = ProcessingStatus.Running
0418                 else:
0419                     if (update_contents or new_contents or new_contents_ext or update_contents_ext or ret_msgs):
0420                         new_process_status = ProcessingStatus.Synchronizing
0421 
0422             update_processing = {'processing_id': processing['processing_id'],
0423                                  'parameters': {'status': new_process_status,
0424                                                 'substatus': process_status,
0425                                                 'locking': ProcessingLocking.Idle}}
0426 
0427             if coll_metadata and 'error' in coll_metadata:
0428                 update_processing['parameters']['errors'] = coll_metadata['error']
0429 
0430             update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0431 
0432             if proc.submitted_at:
0433                 if not processing['submitted_at'] or processing['submitted_at'] < proc.submitted_at:
0434                     update_processing['parameters']['submitted_at'] = proc.submitted_at
0435 
0436             if proc.workload_id and not processing['workload_id']:
0437                 update_processing['parameters']['workload_id'] = proc.workload_id
0438 
0439             # update_processing['parameters']['expired_at'] = work.get_expired_at(processing)
0440             update_processing['parameters']['processing_metadata'] = processing['processing_metadata']
0441 
0442             if parameters:
0443                 # special parameters such as 'output_metadata'
0444                 for p in parameters:
0445                     update_processing['parameters'][p] = parameters[p]
0446 
0447             ret = {'update_processing': update_processing,
0448                    'update_contents': update_contents,
0449                    'new_contents': new_contents,
0450                    'new_input_dependency_contents': new_input_dependency_contents,
0451                    'messages': ret_msgs,
0452                    'new_contents_ext': new_contents_ext,
0453                    'update_contents_ext': update_contents_ext,
0454                    'processing_status': new_process_status}
0455 
0456         except exceptions.ProcessFormatNotSupported as ex:
0457             self.logger.error(ex)
0458             self.logger.error(traceback.format_exc())
0459 
0460             retries = processing['update_retries'] + 1
0461             if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0462                 proc_status = ProcessingStatus.Running
0463             else:
0464                 proc_status = ProcessingStatus.Failed
0465             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0466 
0467             # increase poll period
0468             update_poll_period = int(processing['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
0469             if update_poll_period > self.max_update_poll_period:
0470                 update_poll_period = self.max_update_poll_period
0471 
0472             update_processing = {'processing_id': processing['processing_id'],
0473                                  'parameters': {'status': proc_status,
0474                                                 'locking': ProcessingLocking.Idle,
0475                                                 'update_retries': retries,
0476                                                 'update_poll_period': update_poll_period,
0477                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0478             update_processing['parameters']['errors'].update(error)
0479 
0480             ret = {'update_processing': update_processing,
0481                    'update_contents': []}
0482         except Exception as ex:
0483             self.logger.error(ex)
0484             self.logger.error(traceback.format_exc())
0485 
0486             retries = processing['update_retries'] + 1
0487             if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0488                 proc_status = ProcessingStatus.Running
0489             else:
0490                 proc_status = ProcessingStatus.Failed
0491             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0492             update_processing = {'processing_id': processing['processing_id'],
0493                                  'parameters': {'status': proc_status,
0494                                                 'locking': ProcessingLocking.Idle,
0495                                                 'update_retries': retries,
0496                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0497             update_processing['parameters']['errors'].update(error)
0498             update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0499 
0500             ret = {'update_processing': update_processing,
0501                    'update_contents': []}
0502         return ret
0503 
0504     def handle_update_iprocessing(self, processing):
0505         try:
0506             log_prefix = self.get_log_prefix(processing)
0507 
0508             executors, plugin = None, None
0509             if processing['processing_type']:
0510                 plugin_name = processing['processing_type'].name.lower() + '_poller'
0511                 plugin = self.get_plugin(plugin_name)
0512             else:
0513                 raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])
0514 
0515             ret_handle_update_processing = handle_update_iprocessing(processing,
0516                                                                      self.agent_attributes,
0517                                                                      plugin=plugin,
0518                                                                      max_updates_per_round=self.max_updates_per_round,
0519                                                                      executors=executors,
0520                                                                      logger=self.logger,
0521                                                                      log_prefix=log_prefix)
0522 
0523             process_status, new_contents, new_input_dependency_contents, ret_msgs, update_contents, parameters, new_contents_ext, update_contents_ext = ret_handle_update_processing
0524 
0525             new_process_status = process_status
0526             if is_process_terminated(process_status):
0527                 new_process_status = ProcessingStatus.Terminating
0528                 if is_process_finished(process_status):
0529                     new_process_status = ProcessingStatus.Terminating
0530                 else:
0531                     new_process_status = ProcessingStatus.Terminating
0532 
0533             update_processing = {'processing_id': processing['processing_id'],
0534                                  'parameters': {'status': new_process_status,
0535                                                 'substatus': process_status,
0536                                                 'locking': ProcessingLocking.Idle}}
0537 
0538             update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0539 
0540             if 'submitted_at' in processing['processing_metadata']:
0541                 if not processing['submitted_at'] or processing['submitted_at'] < processing['processing_metadata']['submitted_at']:
0542                     parameters['submitted_at'] = processing['processing_metadata']['submitted_at']
0543 
0544             if 'workload_id' in processing['processing_metadata']:
0545                 parameters['workload_id'] = processing['processing_metadata']['workload_id']
0546 
0547             # update_processing['parameters']['expired_at'] = work.get_expired_at(processing)
0548             update_processing['parameters']['processing_metadata'] = processing['processing_metadata']
0549 
0550             if parameters:
0551                 # special parameters such as 'output_metadata'
0552                 for p in parameters:
0553                     update_processing['parameters'][p] = parameters[p]
0554 
0555             ret = {'update_processing': update_processing,
0556                    'update_contents': update_contents,
0557                    'new_contents': new_contents,
0558                    'new_input_dependency_contents': new_input_dependency_contents,
0559                    'messages': ret_msgs,
0560                    'new_contents_ext': new_contents_ext,
0561                    'update_contents_ext': update_contents_ext,
0562                    'processing_status': new_process_status}
0563         except exceptions.ProcessFormatNotSupported as ex:
0564             self.logger.error(ex)
0565             self.logger.error(traceback.format_exc())
0566 
0567             retries = processing['update_retries'] + 1
0568             if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0569                 proc_status = ProcessingStatus.Running
0570             else:
0571                 proc_status = ProcessingStatus.Failed
0572             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0573 
0574             # increase poll period
0575             update_poll_period = int(processing['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
0576             if update_poll_period > self.max_update_poll_period:
0577                 update_poll_period = self.max_update_poll_period
0578 
0579             update_processing = {'processing_id': processing['processing_id'],
0580                                  'parameters': {'status': proc_status,
0581                                                 'locking': ProcessingLocking.Idle,
0582                                                 'update_retries': retries,
0583                                                 'update_poll_period': update_poll_period,
0584                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0585             update_processing['parameters']['errors'].update(error)
0586 
0587             ret = {'update_processing': update_processing,
0588                    'update_contents': []}
0589         except Exception as ex:
0590             self.logger.error(ex)
0591             self.logger.error(traceback.format_exc())
0592 
0593             retries = processing['update_retries'] + 1
0594             if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0595                 proc_status = ProcessingStatus.Running
0596             else:
0597                 proc_status = ProcessingStatus.Failed
0598             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0599             update_processing = {'processing_id': processing['processing_id'],
0600                                  'parameters': {'status': proc_status,
0601                                                 'locking': ProcessingLocking.Idle,
0602                                                 'update_retries': retries,
0603                                                 'errors': processing['errors'] if processing['errors'] else {}}}
0604             update_processing['parameters']['errors'].update(error)
0605             update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0606 
0607             ret = {'update_processing': update_processing,
0608                    'update_contents': []}
0609         return ret
0610 
0611     def process_update_processing(self, event=None, processing=None):
0612         self.number_workers += 1
0613         pro_ret = ReturnCode.Ok.value
0614         try:
0615             if processing is None and event:
0616                 self.logger.info("process_update_processing, event: %s" % str(event))
0617 
0618                 pr = self.get_processing(processing_id=event._processing_id, status=None, exclude_status=[ProcessingStatus.Prepared], locking=True)
0619                 if not pr:
0620                     self.logger.warn("Cannot find processing for event: %s" % str(event))
0621                     # pro_ret = ReturnCode.Locked.value
0622                     pro_ret = ReturnCode.Ok.value
0623                 elif pr['status'] in Terminated_processing_status:
0624                     parameters = {'locking': ProcessingLocking.Idle}
0625                     update_processing = {'processing_id': pr['processing_id'],
0626                                          'parameters': parameters}
0627                     ret = {'update_processing': update_processing,
0628                            'update_contents': []}
0629                     self.update_processing(ret, pr, renew_updated_at=True)
0630                     pro_ret = ReturnCode.Ok.value
0631                 else:
0632                     processing = pr
0633             if processing:
0634                 pr = processing
0635                 log_pre = self.get_log_prefix(pr)
0636 
0637                 self.logger.info(log_pre + "process_update_processing")
0638                 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0639                     ret = self.handle_update_iprocessing(pr)
0640                 else:
0641                     ret = self.handle_update_processing(pr)
0642                 # self.logger.info(log_pre + "process_update_processing result: %s" % str(ret))
0643 
0644                 self.update_processing(ret, pr, renew_updated_at=True)
0645 
0646                 # if 'processing_status' in ret and ret['processing_status'] == ProcessingStatus.Triggering:
0647                 if True:
0648                     # always triggering
0649                     event_content = {}
0650                     if (('update_contents' in ret and ret['update_contents']) or ('new_contents' in ret and ret['new_contents'])):
0651                         event_content['has_updates'] = True
0652                     if is_process_terminated(pr['substatus']):
0653                         event_content['Terminated'] = True
0654                         event_content['is_terminating'] = True
0655                     self.logger.info(log_pre + "TriggerProcessingEvent(processing_id: %s)" % pr['processing_id'])
0656                     event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event_content)
0657                     self.event_bus.send(event)
0658                 elif 'processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating:
0659                     self.logger.info(log_pre + "TerminatedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0660                     event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0661                     event.set_terminating()
0662                     self.event_bus.send(event)
0663                 else:
0664                     if 'processing_status' in ret and ret['processing_status'] == ProcessingStatus.Synchronizing:
0665                         self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id'])
0666                         event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0667                         event.set_has_updates()
0668                         self.event_bus.send(event)
0669         except Exception as ex:
0670             self.logger.error(ex)
0671             self.logger.error(traceback.format_exc())
0672             pro_ret = ReturnCode.Failed.value
0673         self.number_workers -= 1
0674         return pro_ret
0675 
0676     def clean_locks(self, force=False):
0677         try:
0678             self.logger.info(f"clean locking: force: {force}")
0679             health_items = self.get_health_items()
0680             min_request_id = BaseAgent.min_request_id
0681             hostname, pid, thread_id, thread_name = self.get_process_thread_info()
0682             ret = core_processings.clean_locking(health_items=health_items, min_request_id=min_request_id,
0683                                                  time_period=self.clean_locks_time_period,
0684                                                  force=force, hostname=hostname, pid=pid)
0685             self.logger.info(f"clean locking finished. Cleaned locks: {ret}")
0686         except Exception as ex:
0687             self.logger.info(f"Failed clean locking: {ex}")
0688 
0689     def init_event_function_map(self):
0690         self.event_func_map = {
0691             EventType.UpdateProcessing: {
0692                 'pre_check': self.is_ok_to_run_more_processings,
0693                 'exec_func': self.process_update_processing
0694             }
0695         }
0696 
0697     def run(self):
0698         """
0699         Main run function.
0700         """
0701         try:
0702             self.logger.info("Starting main thread")
0703             self.init_thread_info()
0704 
0705             self.load_plugins()
0706             self.init()
0707 
0708             self.clean_locks(force=True)
0709             time.sleep(5)
0710 
0711             self.add_default_tasks()
0712 
0713             self.init_event_function_map()
0714 
0715             task = self.create_task(task_func=self.get_running_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0716             self.add_task(task)
0717 
0718             task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
0719             self.add_task(task)
0720 
0721             task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0722             self.add_task(task)
0723 
0724             self.execute()
0725         except KeyboardInterrupt:
0726             self.stop()
0727 
0728 
0729 if __name__ == '__main__':
0730     agent = Poller()
0731     agent()