File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import datetime
0012 import random
0013 import time
0014 import traceback
0015
0016 from idds.common import exceptions
0017 from idds.common.constants import (Sections, ReturnCode, ProcessingType,
0018 ProcessingStatus, ProcessingLocking,
0019 Terminated_processing_status)
0020 from idds.common.utils import setup_logging, truncate_string, json_dumps
0021 from idds.core import processings as core_processings
0022 from idds.agents.common.baseagent import BaseAgent
0023 from idds.agents.common.eventbus.event import (EventType,
0024
0025 TriggerProcessingEvent,
0026 SyncProcessingEvent,
0027 TerminatedProcessingEvent)
0028
0029 from .utils import handle_update_processing_new, is_process_terminated, is_process_finished
0030 from .iutils import handle_update_iprocessing
0031
0032 setup_logging(__name__)
0033
0034
0035 class Poller(BaseAgent):
0036 """
0037 Poller works to submit and running tasks to WFMS.
0038 """
0039
0040 def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
0041 max_updates_per_round=2000, name='Poller', message_bulk_size=1000, locking_period=1800,
0042 use_process_pool=False, poller_max_number_workers=None, **kwargs):
0043 if poller_max_number_workers:
0044 self.max_number_workers = int(poller_max_number_workers)
0045 else:
0046 self.max_number_workers = int(max_number_workers)
0047 if int(num_threads) < int(self.max_number_workers):
0048 num_threads = int(self.max_number_workers)
0049
0050 self.set_max_workers()
0051
0052 super(Poller, self).__init__(num_threads=num_threads, name=name, use_process_pool=False, **kwargs)
0053 self.config_section = Sections.Carrier
0054 self.poll_period = int(poll_period)
0055 self.locking_period = int(locking_period)
0056 self.retries = int(retries)
0057 self.retrieve_bulk_size = int(retrieve_bulk_size)
0058 self.message_bulk_size = int(message_bulk_size)
0059
0060 if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0061 self.new_poll_period = 10
0062 else:
0063 self.new_poll_period = int(self.new_poll_period)
0064 if not hasattr(self, 'new_fail_poll_period') or not self.new_fail_poll_period:
0065 self.new_fail_poll_period = 120
0066 else:
0067 self.new_fail_poll_period = int(self.new_fail_poll_period)
0068 if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0069 self.update_poll_period = self.poll_period
0070 else:
0071 self.update_poll_period = int(self.update_poll_period)
0072
0073 if not hasattr(self, 'update_poll_period_for_new_task') or not self.update_poll_period_for_new_task:
0074 self.update_poll_period_for_new_task = 180
0075 else:
0076 self.update_poll_period_for_new_task = int(self.update_poll_period_for_new_task)
0077
0078 if hasattr(self, 'poll_period_increase_rate'):
0079 self.poll_period_increase_rate = float(self.poll_period_increase_rate)
0080 else:
0081 self.poll_period_increase_rate = 2
0082
0083 if hasattr(self, 'max_new_poll_period'):
0084 self.max_new_poll_period = int(self.max_new_poll_period)
0085 else:
0086 self.max_new_poll_period = 3600 * 6
0087 if hasattr(self, 'max_update_poll_period'):
0088 self.max_update_poll_period = int(self.max_update_poll_period)
0089 else:
0090 self.max_update_poll_period = 3600 * 6
0091
0092 self.number_workers = 0
0093 if not hasattr(self, 'max_number_workers') or not self.max_number_workers:
0094 self.max_number_workers = 3
0095 else:
0096 self.max_number_workers = int(self.max_number_workers)
0097
0098 self.max_updates_per_round = int(max_updates_per_round)
0099 self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round)
0100
0101 if not hasattr(self, 'enable_executors') or not self.enable_executors:
0102 self.enable_executors = False
0103 else:
0104 if str(self.enable_executors).lower() == 'true':
0105 self.enable_executors = True
0106 else:
0107 self.enable_executors = False
0108 self.logger.info("enable_executors: %s" % self.enable_executors)
0109
0110 self.show_queue_size_time = None
0111
0112 self.extra_executors = None
0113
0114 self._running_processing_status = None
0115
0116 if hasattr(self, 'clean_locks_time_period'):
0117 self.clean_locks_time_period = int(self.clean_locks_time_period)
0118 else:
0119 self.clean_locks_time_period = 1800
0120
0121 def get_extra_executors(self):
0122 if self.enable_executors:
0123 if self.extra_executors is None:
0124 name = self.executor_name + "_Extra"
0125 self.extra_executors = self.create_executors(name, max_workers=self.num_threads)
0126 return self.extra_executors
0127
0128 def is_ok_to_run_more_processings(self):
0129 if self.get_num_free_workers() > 0:
0130 return True
0131 return False
0132
0133 def get_bulk_size(self):
0134 return min(self.retrieve_bulk_size, self.get_num_free_workers())
0135
0136 def show_queue_size(self):
0137 if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0138 self.show_queue_size_time = time.time()
0139
0140 exec_max_workers = self.executors.get_max_workers()
0141 exec_num_workers = self.executors.get_num_workers()
0142 q_str = "number of processings: %s, max number of processings: %s" % (exec_num_workers, exec_max_workers)
0143 self.logger.debug(q_str)
0144
0145 def init(self):
0146 try:
0147 status = [ProcessingStatus.New, ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0148 ProcessingStatus.Running, ProcessingStatus.FinishedOnExec]
0149 core_processings.clean_next_poll_at(status)
0150 except Exception as ex:
0151 self.logger.info(f"Failed clean next_poll_at: {ex}")
0152
0153 def get_running_processings(self):
0154 """
0155 Get running processing
0156 """
0157 try:
0158 if not self.is_ok_to_run_more_processings():
0159 return []
0160
0161 self.show_queue_size()
0162
0163 if BaseAgent.min_request_id is None:
0164 return []
0165
0166 processing_status = [ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0167 ProcessingStatus.Running, ProcessingStatus.FinishedOnExec,
0168 ProcessingStatus.ToCancel, ProcessingStatus.Cancelling,
0169 ProcessingStatus.ToSuspend, ProcessingStatus.Suspending,
0170 ProcessingStatus.ToResume, ProcessingStatus.Resuming,
0171 ProcessingStatus.ToExpire, ProcessingStatus.Expiring,
0172 ProcessingStatus.ToFinish, ProcessingStatus.ToForceFinish]
0173 self._running_processing_status = processing_status
0174
0175
0176 processings = core_processings.get_processings_by_status(status=processing_status,
0177 locking=True, update_poll=True,
0178 min_request_id=BaseAgent.min_request_id,
0179 bulk_size=self.get_bulk_size())
0180
0181
0182 if processings:
0183 processing_ids = [pr['processing_id'] for pr in processings]
0184 self.logger.info("Main thread get [submitting + submitted + running] processings to process: %s" % (str(processing_ids)))
0185
0186 for pr in processings:
0187
0188 self.submit(self.process_update_processing, **{"processing": pr})
0189
0190 return processings
0191 except exceptions.DatabaseException as ex:
0192 if 'ORA-00060' in str(ex):
0193 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0194 else:
0195
0196 self.logger.error(ex)
0197 self.logger.error(traceback.format_exc())
0198 return []
0199
0200 def get_processing(self, processing_id, status=None, exclude_status=None, locking=False):
0201 try:
0202 return core_processings.get_processing_by_id_status(processing_id=processing_id,
0203 status=status,
0204 exclude_status=exclude_status,
0205 locking=locking,
0206 to_lock=True,
0207 lock_period=self.locking_period)
0208 except exceptions.DatabaseException as ex:
0209 if 'ORA-00060' in str(ex):
0210 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0211 else:
0212
0213 self.logger.error(ex)
0214 self.logger.error(traceback.format_exc())
0215 return None
0216
0217 def get_work_tag_attribute(self, work_tag, attribute):
0218 work_tag_attribute_value = None
0219 if work_tag:
0220 work_tag_attribute = work_tag + "_" + attribute
0221 if hasattr(self, work_tag_attribute):
0222 work_tag_attribute_value = int(getattr(self, work_tag_attribute))
0223 return work_tag_attribute_value
0224
0225 def load_poll_period(self, processing, parameters, new=False):
0226 if 'processing' in processing['processing_metadata']:
0227 proc = processing['processing_metadata']['processing']
0228 work = proc.work
0229 else:
0230 work = processing['processing_metadata']['work']
0231
0232 work_tag = work.get_work_tag()
0233
0234 work_tag_new_poll_period = self.get_work_tag_attribute(work_tag, "new_poll_period")
0235 if work_tag_new_poll_period:
0236 parameters['new_poll_period'] = work_tag_new_poll_period
0237 elif self.new_poll_period and processing['new_poll_period'] != self.new_poll_period:
0238 parameters['new_poll_period'] = self.new_poll_period
0239
0240 if new:
0241 work_tag_update_poll_period_for_new = self.get_work_tag_attribute(work_tag, "update_poll_period_for_new_task")
0242 if work_tag_update_poll_period_for_new:
0243 parameters['update_poll_period'] = work_tag_update_poll_period_for_new
0244 elif self.update_poll_period_for_new_task and processing['update_poll_period'] != self.update_poll_period_for_new_task:
0245 parameters['update_poll_period'] = self.update_poll_period_for_new_task
0246 else:
0247 work_tag_update_poll_period = self.get_work_tag_attribute(work_tag, "update_poll_period")
0248 if work_tag_update_poll_period:
0249 parameters['update_poll_period'] = work_tag_update_poll_period
0250 elif self.update_poll_period and processing['update_poll_period'] != self.update_poll_period:
0251 parameters['update_poll_period'] = self.update_poll_period
0252 return parameters
0253
0254 def get_log_prefix(self, processing):
0255 return "<request_id=%s,transform_id=%s,processing_id=%s>" % (processing['request_id'],
0256 processing['transform_id'],
0257 processing['processing_id'])
0258
0259 def update_processing(self, processing, processing_model, use_bulk_update_mappings=True, renew_updated_at=False):
0260 try:
0261 if processing:
0262 log_prefix = self.get_log_prefix(processing_model)
0263
0264 self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters']))
0265
0266 processing['update_processing']['parameters']['locking'] = ProcessingLocking.Idle
0267
0268 if renew_updated_at:
0269 processing['update_processing']['parameters']['updated_at'] = datetime.datetime.utcnow()
0270
0271 if 'status' in processing['update_processing']['parameters']:
0272 new_status = processing['update_processing']['parameters']['status']
0273 if new_status == ProcessingStatus.Submitting and processing_model['status'].value > ProcessingStatus.Submitting.value:
0274 processing['update_processing']['parameters']['status'] = ProcessingStatus.Submitted
0275
0276 self.logger.info(log_prefix + "update_processing: %s" % (processing['update_processing']['parameters']))
0277
0278 retry = True
0279 retry_num = 0
0280 while retry:
0281 retry = False
0282 retry_num += 1
0283 try:
0284 core_processings.update_processing_contents(update_processing=processing.get('update_processing', None),
0285 request_id=processing_model['request_id'],
0286 update_collections=processing.get('update_collections', None),
0287 update_contents=processing.get('update_contents', None),
0288 update_dep_contents=processing.get('update_dep_contents', None),
0289 messages=processing.get('messages', None),
0290 update_messages=processing.get('update_messages', None),
0291 new_contents=processing.get('new_contents', None),
0292 new_update_contents=processing.get('new_update_contents', None),
0293 new_contents_ext=processing.get('new_contents_ext', None),
0294 update_contents_ext=processing.get('update_contents_ext', None),
0295 new_input_dependency_contents=processing.get('new_input_dependency_contents', None),
0296 use_bulk_update_mappings=use_bulk_update_mappings,
0297 message_bulk_size=self.message_bulk_size)
0298 except exceptions.DatabaseException as ex:
0299 if 'ORA-00060' in str(ex):
0300 self.logger.warn(log_prefix + "update_processing (cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0301 else:
0302 self.logger.error(ex)
0303 self.logger.error(traceback.format_exc())
0304 if retry_num < 5:
0305 retry = True
0306 if retry_num <= 1:
0307 random_sleep = random.randint(1, 10)
0308 elif retry_num <= 2:
0309 random_sleep = random.randint(1, 60)
0310 else:
0311 random_sleep = random.randint(1, 120)
0312 self.logger.error(f"{log_prefix} retry_num: {retry_num} random_sleep: {random_sleep}")
0313 time.sleep(random_sleep)
0314 else:
0315 raise ex
0316 except Exception as ex:
0317 self.logger.error(ex)
0318 self.logger.error(traceback.format_exc())
0319 self.logger.warn("Failed to update_processings: %s" % json_dumps(processing))
0320 try:
0321 processing_id = processing['update_processing']['processing_id']
0322
0323 parameters = {'status': processing['update_processing']['parameters']['status'],
0324 'locking': ProcessingLocking.Idle}
0325 if 'new_retries' in processing['update_processing']['parameters']:
0326 parameters['new_retries'] = processing['update_processing']['parameters']['new_retries']
0327 if 'update_retries' in processing['update_processing']['parameters']:
0328 parameters['update_retries'] = processing['update_processing']['parameters']['update_retries']
0329 if 'errors' in processing['update_processing']['parameters']:
0330 parameters['errors'] = processing['update_processing']['parameters']['errors']
0331
0332 self.logger.warn(log_prefix + "update_processing exception result: %s" % (parameters))
0333 core_processings.update_processing(processing_id=processing_id, parameters=parameters)
0334 except Exception as ex:
0335 self.logger.error(ex)
0336 self.logger.error(traceback.format_exc())
0337
0338 def handle_update_processing(self, processing):
0339 try:
0340 log_prefix = self.get_log_prefix(processing)
0341 executors = None
0342 if self.enable_executors:
0343 executors = self.get_extra_executors()
0344
0345 ret_handle_update_processing = handle_update_processing_new(processing,
0346 self.agent_attributes,
0347 max_updates_per_round=self.max_updates_per_round,
0348 max_jobs_per_round=self.max_updates_per_round,
0349 executors=executors,
0350 logger=self.logger,
0351 log_prefix=log_prefix)
0352
0353 process_status, new_contents, new_input_dependency_contents, ret_msgs, update_contents, parameters, new_contents_ext, update_contents_ext = ret_handle_update_processing
0354
0355 coll_metadata = parameters.pop("coll_metadata", None)
0356
0357 proc = processing['processing_metadata']['processing']
0358 work = proc.work
0359
0360 update_collections = []
0361 if work.is_data_work():
0362 input_collections = work.get_input_collections(poll_externel=True)
0363 output_collections = work.get_output_collections()
0364 for output_coll in output_collections:
0365 if not output_coll.coll_metadata:
0366 output_coll.coll_metadata = {}
0367 for k, v in coll_metadata:
0368 output_coll.coll_metadata[k] = v
0369 for coll in input_collections:
0370 u_coll = {
0371 'coll_id': coll.coll_id,
0372 'total_files': coll.get("total_files", 0),
0373 'processed_files': coll.get("availability", 0),
0374 'processing_files': 0,
0375 'activated_files': 0,
0376 'preprocessing_files': 0,
0377 'new_files': 0,
0378 'failed_files': 0,
0379 'missing_files': 0,
0380 'bytes': coll.get("bytes", 0),
0381 'ext_files': 0,
0382 'processed_ext_files': 0,
0383 'failed_ext_files': 0,
0384 'missing_ext_files': 0,
0385 'coll_metadata': coll.coll_metadata,
0386 }
0387 update_collections.append(u_coll)
0388 for coll in output_collections:
0389 u_coll = {
0390 'coll_id': coll.coll_id,
0391 'total_files': coll.get("total_files", 0),
0392 'processed_files': coll.get("availability", 0),
0393 'processing_files': coll.get("processing", 0),
0394 'activated_files': 0,
0395 'preprocessing_files': 0,
0396 'new_files': 0,
0397 'failed_files': coll.get("stuck", 0),
0398 'missing_files': 0,
0399 'bytes': 0,
0400 'coll_metadata': coll.coll_metadata,
0401 }
0402 update_collections.append(u_coll)
0403
0404 if work.use_dependency_to_release_jobs():
0405 new_process_status = ProcessingStatus.Triggering
0406 else:
0407 new_process_status = process_status
0408 if is_process_terminated(process_status):
0409 new_process_status = ProcessingStatus.Terminating
0410 if is_process_finished(process_status):
0411 new_process_status = ProcessingStatus.Terminating
0412 else:
0413 retries = processing['update_retries'] + 1
0414 if processing['max_update_retries'] and retries < processing['max_update_retries']:
0415 work.reactivate_processing(processing, log_prefix=log_prefix)
0416 process_status = ProcessingStatus.Running
0417 new_process_status = ProcessingStatus.Running
0418 else:
0419 if (update_contents or new_contents or new_contents_ext or update_contents_ext or ret_msgs):
0420 new_process_status = ProcessingStatus.Synchronizing
0421
0422 update_processing = {'processing_id': processing['processing_id'],
0423 'parameters': {'status': new_process_status,
0424 'substatus': process_status,
0425 'locking': ProcessingLocking.Idle}}
0426
0427 if coll_metadata and 'error' in coll_metadata:
0428 update_processing['parameters']['errors'] = coll_metadata['error']
0429
0430 update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0431
0432 if proc.submitted_at:
0433 if not processing['submitted_at'] or processing['submitted_at'] < proc.submitted_at:
0434 update_processing['parameters']['submitted_at'] = proc.submitted_at
0435
0436 if proc.workload_id and not processing['workload_id']:
0437 update_processing['parameters']['workload_id'] = proc.workload_id
0438
0439
0440 update_processing['parameters']['processing_metadata'] = processing['processing_metadata']
0441
0442 if parameters:
0443
0444 for p in parameters:
0445 update_processing['parameters'][p] = parameters[p]
0446
0447 ret = {'update_processing': update_processing,
0448 'update_contents': update_contents,
0449 'new_contents': new_contents,
0450 'new_input_dependency_contents': new_input_dependency_contents,
0451 'messages': ret_msgs,
0452 'new_contents_ext': new_contents_ext,
0453 'update_contents_ext': update_contents_ext,
0454 'processing_status': new_process_status}
0455
0456 except exceptions.ProcessFormatNotSupported as ex:
0457 self.logger.error(ex)
0458 self.logger.error(traceback.format_exc())
0459
0460 retries = processing['update_retries'] + 1
0461 if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0462 proc_status = ProcessingStatus.Running
0463 else:
0464 proc_status = ProcessingStatus.Failed
0465 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0466
0467
0468 update_poll_period = int(processing['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
0469 if update_poll_period > self.max_update_poll_period:
0470 update_poll_period = self.max_update_poll_period
0471
0472 update_processing = {'processing_id': processing['processing_id'],
0473 'parameters': {'status': proc_status,
0474 'locking': ProcessingLocking.Idle,
0475 'update_retries': retries,
0476 'update_poll_period': update_poll_period,
0477 'errors': processing['errors'] if processing['errors'] else {}}}
0478 update_processing['parameters']['errors'].update(error)
0479
0480 ret = {'update_processing': update_processing,
0481 'update_contents': []}
0482 except Exception as ex:
0483 self.logger.error(ex)
0484 self.logger.error(traceback.format_exc())
0485
0486 retries = processing['update_retries'] + 1
0487 if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0488 proc_status = ProcessingStatus.Running
0489 else:
0490 proc_status = ProcessingStatus.Failed
0491 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0492 update_processing = {'processing_id': processing['processing_id'],
0493 'parameters': {'status': proc_status,
0494 'locking': ProcessingLocking.Idle,
0495 'update_retries': retries,
0496 'errors': processing['errors'] if processing['errors'] else {}}}
0497 update_processing['parameters']['errors'].update(error)
0498 update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0499
0500 ret = {'update_processing': update_processing,
0501 'update_contents': []}
0502 return ret
0503
0504 def handle_update_iprocessing(self, processing):
0505 try:
0506 log_prefix = self.get_log_prefix(processing)
0507
0508 executors, plugin = None, None
0509 if processing['processing_type']:
0510 plugin_name = processing['processing_type'].name.lower() + '_poller'
0511 plugin = self.get_plugin(plugin_name)
0512 else:
0513 raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])
0514
0515 ret_handle_update_processing = handle_update_iprocessing(processing,
0516 self.agent_attributes,
0517 plugin=plugin,
0518 max_updates_per_round=self.max_updates_per_round,
0519 executors=executors,
0520 logger=self.logger,
0521 log_prefix=log_prefix)
0522
0523 process_status, new_contents, new_input_dependency_contents, ret_msgs, update_contents, parameters, new_contents_ext, update_contents_ext = ret_handle_update_processing
0524
0525 new_process_status = process_status
0526 if is_process_terminated(process_status):
0527 new_process_status = ProcessingStatus.Terminating
0528 if is_process_finished(process_status):
0529 new_process_status = ProcessingStatus.Terminating
0530 else:
0531 new_process_status = ProcessingStatus.Terminating
0532
0533 update_processing = {'processing_id': processing['processing_id'],
0534 'parameters': {'status': new_process_status,
0535 'substatus': process_status,
0536 'locking': ProcessingLocking.Idle}}
0537
0538 update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0539
0540 if 'submitted_at' in processing['processing_metadata']:
0541 if not processing['submitted_at'] or processing['submitted_at'] < processing['processing_metadata']['submitted_at']:
0542 parameters['submitted_at'] = processing['processing_metadata']['submitted_at']
0543
0544 if 'workload_id' in processing['processing_metadata']:
0545 parameters['workload_id'] = processing['processing_metadata']['workload_id']
0546
0547
0548 update_processing['parameters']['processing_metadata'] = processing['processing_metadata']
0549
0550 if parameters:
0551
0552 for p in parameters:
0553 update_processing['parameters'][p] = parameters[p]
0554
0555 ret = {'update_processing': update_processing,
0556 'update_contents': update_contents,
0557 'new_contents': new_contents,
0558 'new_input_dependency_contents': new_input_dependency_contents,
0559 'messages': ret_msgs,
0560 'new_contents_ext': new_contents_ext,
0561 'update_contents_ext': update_contents_ext,
0562 'processing_status': new_process_status}
0563 except exceptions.ProcessFormatNotSupported as ex:
0564 self.logger.error(ex)
0565 self.logger.error(traceback.format_exc())
0566
0567 retries = processing['update_retries'] + 1
0568 if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0569 proc_status = ProcessingStatus.Running
0570 else:
0571 proc_status = ProcessingStatus.Failed
0572 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0573
0574
0575 update_poll_period = int(processing['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
0576 if update_poll_period > self.max_update_poll_period:
0577 update_poll_period = self.max_update_poll_period
0578
0579 update_processing = {'processing_id': processing['processing_id'],
0580 'parameters': {'status': proc_status,
0581 'locking': ProcessingLocking.Idle,
0582 'update_retries': retries,
0583 'update_poll_period': update_poll_period,
0584 'errors': processing['errors'] if processing['errors'] else {}}}
0585 update_processing['parameters']['errors'].update(error)
0586
0587 ret = {'update_processing': update_processing,
0588 'update_contents': []}
0589 except Exception as ex:
0590 self.logger.error(ex)
0591 self.logger.error(traceback.format_exc())
0592
0593 retries = processing['update_retries'] + 1
0594 if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0595 proc_status = ProcessingStatus.Running
0596 else:
0597 proc_status = ProcessingStatus.Failed
0598 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0599 update_processing = {'processing_id': processing['processing_id'],
0600 'parameters': {'status': proc_status,
0601 'locking': ProcessingLocking.Idle,
0602 'update_retries': retries,
0603 'errors': processing['errors'] if processing['errors'] else {}}}
0604 update_processing['parameters']['errors'].update(error)
0605 update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0606
0607 ret = {'update_processing': update_processing,
0608 'update_contents': []}
0609 return ret
0610
0611 def process_update_processing(self, event=None, processing=None):
0612 self.number_workers += 1
0613 pro_ret = ReturnCode.Ok.value
0614 try:
0615 if processing is None and event:
0616 self.logger.info("process_update_processing, event: %s" % str(event))
0617
0618 pr = self.get_processing(processing_id=event._processing_id, status=None, exclude_status=[ProcessingStatus.Prepared], locking=True)
0619 if not pr:
0620 self.logger.warn("Cannot find processing for event: %s" % str(event))
0621
0622 pro_ret = ReturnCode.Ok.value
0623 elif pr['status'] in Terminated_processing_status:
0624 parameters = {'locking': ProcessingLocking.Idle}
0625 update_processing = {'processing_id': pr['processing_id'],
0626 'parameters': parameters}
0627 ret = {'update_processing': update_processing,
0628 'update_contents': []}
0629 self.update_processing(ret, pr, renew_updated_at=True)
0630 pro_ret = ReturnCode.Ok.value
0631 else:
0632 processing = pr
0633 if processing:
0634 pr = processing
0635 log_pre = self.get_log_prefix(pr)
0636
0637 self.logger.info(log_pre + "process_update_processing")
0638 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0639 ret = self.handle_update_iprocessing(pr)
0640 else:
0641 ret = self.handle_update_processing(pr)
0642
0643
0644 self.update_processing(ret, pr, renew_updated_at=True)
0645
0646
0647 if True:
0648
0649 event_content = {}
0650 if (('update_contents' in ret and ret['update_contents']) or ('new_contents' in ret and ret['new_contents'])):
0651 event_content['has_updates'] = True
0652 if is_process_terminated(pr['substatus']):
0653 event_content['Terminated'] = True
0654 event_content['is_terminating'] = True
0655 self.logger.info(log_pre + "TriggerProcessingEvent(processing_id: %s)" % pr['processing_id'])
0656 event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event_content)
0657 self.event_bus.send(event)
0658 elif 'processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating:
0659 self.logger.info(log_pre + "TerminatedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0660 event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0661 event.set_terminating()
0662 self.event_bus.send(event)
0663 else:
0664 if 'processing_status' in ret and ret['processing_status'] == ProcessingStatus.Synchronizing:
0665 self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id'])
0666 event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0667 event.set_has_updates()
0668 self.event_bus.send(event)
0669 except Exception as ex:
0670 self.logger.error(ex)
0671 self.logger.error(traceback.format_exc())
0672 pro_ret = ReturnCode.Failed.value
0673 self.number_workers -= 1
0674 return pro_ret
0675
0676 def clean_locks(self, force=False):
0677 try:
0678 self.logger.info(f"clean locking: force: {force}")
0679 health_items = self.get_health_items()
0680 min_request_id = BaseAgent.min_request_id
0681 hostname, pid, thread_id, thread_name = self.get_process_thread_info()
0682 ret = core_processings.clean_locking(health_items=health_items, min_request_id=min_request_id,
0683 time_period=self.clean_locks_time_period,
0684 force=force, hostname=hostname, pid=pid)
0685 self.logger.info(f"clean locking finished. Cleaned locks: {ret}")
0686 except Exception as ex:
0687 self.logger.info(f"Failed clean locking: {ex}")
0688
0689 def init_event_function_map(self):
0690 self.event_func_map = {
0691 EventType.UpdateProcessing: {
0692 'pre_check': self.is_ok_to_run_more_processings,
0693 'exec_func': self.process_update_processing
0694 }
0695 }
0696
0697 def run(self):
0698 """
0699 Main run function.
0700 """
0701 try:
0702 self.logger.info("Starting main thread")
0703 self.init_thread_info()
0704
0705 self.load_plugins()
0706 self.init()
0707
0708 self.clean_locks(force=True)
0709 time.sleep(5)
0710
0711 self.add_default_tasks()
0712
0713 self.init_event_function_map()
0714
0715 task = self.create_task(task_func=self.get_running_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0716 self.add_task(task)
0717
0718 task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
0719 self.add_task(task)
0720
0721 task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0722 self.add_task(task)
0723
0724 self.execute()
0725 except KeyboardInterrupt:
0726 self.stop()
0727
0728
0729 if __name__ == '__main__':
0730 agent = Poller()
0731 agent()