File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import traceback
0012
0013 from idds.common import exceptions
0014 from idds.common.constants import (ProcessingStatus, ProcessingLocking, ReturnCode,
0015 Terminated_processing_status)
0016 from idds.common.utils import setup_logging, truncate_string
0017 from idds.core import processings as core_processings
0018 from idds.agents.common.baseagent import BaseAgent
0019 from idds.agents.common.eventbus.event import (EventType,
0020 TerminatedProcessingEvent,
0021 SyncProcessingEvent)
0022
0023 from .utils import (handle_trigger_processing,
0024 is_process_terminated)
0025 from .poller import Poller
0026
0027 setup_logging(__name__)
0028
0029
0030 class Trigger(Poller):
0031 """
0032 Trigger works to trigger to release jobs
0033 """
0034
0035 def __init__(self, num_threads=1, trigger_max_number_workers=None, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
0036 name='Trigger', use_process_pool=False, message_bulk_size=1000, max_updates_per_round=2000, **kwargs):
0037 if trigger_max_number_workers:
0038 self.max_number_workers = int(trigger_max_number_workers)
0039 else:
0040 self.max_number_workers = int(max_number_workers)
0041
0042 self.set_max_workers()
0043
0044 num_threads = int(self.max_number_workers)
0045 super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers,
0046 max_updates_per_round=max_updates_per_round, use_process_pool=use_process_pool,
0047 retrieve_bulk_size=retrieve_bulk_size, **kwargs)
0048 self.logger.info("num_threads: %s" % num_threads)
0049
0050 self.max_updates_per_round = int(max_updates_per_round)
0051 self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round)
0052
0053 if hasattr(self, 'trigger_max_number_workers'):
0054 self.max_number_workers = int(self.trigger_max_number_workers)
0055 self.number_msg_workers = 0
0056
0057 def is_ok_to_run_more_msg_processings(self):
0058 if self.get_num_free_workers() > 0:
0059 return True
0060 return False
0061
0062 def get_trigger_processings(self):
0063 """
0064 Get trigger processing
0065 """
0066 try:
0067 self.show_queue_size()
0068 if not self.is_ok_to_run_more_processings():
0069 return []
0070
0071
0072 if BaseAgent.min_request_id is None:
0073 return []
0074
0075 processing_status = [ProcessingStatus.ToTrigger, ProcessingStatus.Triggering]
0076 processings = core_processings.get_processings_by_status(status=processing_status,
0077 locking=True, update_poll=True,
0078 min_request_id=BaseAgent.min_request_id,
0079 bulk_size=self.retrieve_bulk_size)
0080 if processings:
0081 processing_ids = [pr['processing_id'] for pr in processings]
0082 self.logger.info("Main thread get [ToTrigger, Triggering] processings to process: %s" % (str(processing_ids)))
0083
0084 for pr in processings:
0085 self.submit(self.process_trigger_processing, **{"processing": pr})
0086
0087 return processings
0088 except exceptions.DatabaseException as ex:
0089 if 'ORA-00060' in str(ex):
0090 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0091 else:
0092
0093 self.logger.error(ex)
0094 self.logger.error(traceback.format_exc())
0095 return []
0096
0097 def handle_trigger_processing(self, processing, trigger_new_updates=False):
0098 try:
0099 log_prefix = self.get_log_prefix(processing)
0100 executors = None
0101 if self.enable_executors:
0102 executors = self.get_extra_executors()
0103
0104 ret_trigger_processing = handle_trigger_processing(processing,
0105 self.agent_attributes,
0106 trigger_new_updates=trigger_new_updates,
0107 max_updates_per_round=self.max_updates_per_round,
0108 executors=executors,
0109 logger=self.logger,
0110 log_prefix=log_prefix)
0111 process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms, has_updates = ret_trigger_processing
0112
0113 self.logger.debug(log_prefix + "handle_trigger_processing: ret_update_transforms: %s" % str(ret_update_transforms))
0114
0115 new_process_status = process_status
0116 if is_process_terminated(process_status):
0117 new_process_status = ProcessingStatus.Terminating
0118
0119 update_processing = {'processing_id': processing['processing_id'],
0120 'parameters': {'status': new_process_status,
0121 'substatus': process_status,
0122 'locking': ProcessingLocking.Idle}}
0123
0124 if parameters:
0125
0126 for p in parameters:
0127 update_processing['parameters'][p] = parameters[p]
0128
0129 ret = {'update_processing': update_processing,
0130 'update_contents': update_contents,
0131 'messages': ret_msgs,
0132 'new_update_contents': new_update_contents,
0133 'update_transforms': ret_update_transforms,
0134 'update_dep_contents': (processing['request_id'], update_dep_contents_status_name, update_dep_contents_status),
0135 'processing_status': new_process_status,
0136 'has_updates': has_updates}
0137 except exceptions.ProcessFormatNotSupported as ex:
0138 self.logger.error(ex)
0139 self.logger.error(traceback.format_exc())
0140
0141 retries = processing['update_retries'] + 1
0142 if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0143 proc_status = ProcessingStatus.Running
0144 else:
0145 proc_status = ProcessingStatus.Failed
0146 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0147
0148
0149 update_poll_period = int(processing['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
0150 if update_poll_period > self.max_update_poll_period:
0151 update_poll_period = self.max_update_poll_period
0152
0153 update_processing = {'processing_id': processing['processing_id'],
0154 'parameters': {'status': proc_status,
0155 'locking': ProcessingLocking.Idle,
0156 'update_retries': retries,
0157 'update_poll_period': update_poll_period,
0158 'errors': processing['errors'] if processing['errors'] else {}}}
0159 update_processing['parameters']['errors'].update(error)
0160
0161 ret = {'update_processing': update_processing,
0162 'update_contents': []}
0163 except Exception as ex:
0164 self.logger.error(ex)
0165 self.logger.error(traceback.format_exc())
0166
0167 retries = processing['update_retries'] + 1
0168 if not processing['max_update_retries'] or retries < processing['max_update_retries']:
0169 proc_status = ProcessingStatus.Running
0170 else:
0171 proc_status = ProcessingStatus.Failed
0172 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0173 update_processing = {'processing_id': processing['processing_id'],
0174 'parameters': {'status': proc_status,
0175 'locking': ProcessingLocking.Idle,
0176 'update_retries': retries,
0177 'errors': processing['errors'] if processing['errors'] else {}}}
0178 update_processing['parameters']['errors'].update(error)
0179 update_processing['parameters'] = self.load_poll_period(processing, update_processing['parameters'])
0180
0181 ret = {'update_processing': update_processing,
0182 'update_contents': []}
0183 return ret
0184
0185 def process_trigger_processing_real(self, event=None, processing=None):
0186 pro_ret = ReturnCode.Ok.value
0187 try:
0188 if processing is None and event:
0189
0190 self.logger.info("process_trigger_processing, event: %s" % str(event))
0191 pr = self.get_processing(processing_id=event._processing_id, status=None, exclude_status=[ProcessingStatus.Prepared], locking=True)
0192 if not pr:
0193 self.logger.warn("Cannot find processing for event: %s" % str(event))
0194
0195 pro_ret = ReturnCode.Ok.value
0196 elif pr['status'] in Terminated_processing_status:
0197 parameters = {'locking': ProcessingLocking.Idle}
0198 update_processing = {'processing_id': pr['processing_id'],
0199 'parameters': parameters}
0200 ret = {'update_processing': update_processing,
0201 'update_contents': []}
0202 self.update_processing(ret, pr)
0203 pro_ret = ReturnCode.Ok.value
0204 else:
0205 processing = pr
0206 if processing:
0207 pr = processing
0208 log_pre = self.get_log_prefix(pr)
0209 self.logger.info(log_pre + "process_trigger_processing")
0210 ret = self.handle_trigger_processing(pr)
0211
0212
0213
0214 ret['new_update_contents'] = None
0215
0216 self.update_processing(ret, pr)
0217
0218 if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating)
0219 or (event and event._content and 'Terminated' in event._content and event._content['Terminated'])):
0220 self.logger.info(log_pre + "TerminatedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0221 event = TerminatedProcessingEvent(publisher_id=self.id,
0222 processing_id=pr['processing_id'],
0223 content=event._content if event else None)
0224 event.set_terminating()
0225 self.event_bus.send(event)
0226 else:
0227
0228
0229
0230
0231
0232 self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id'])
0233 event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'],
0234 content=event._content if event else None)
0235 self.event_bus.send(event)
0236 except Exception as ex:
0237 self.logger.error(ex)
0238 self.logger.error(traceback.format_exc())
0239 pro_ret = ReturnCode.Failed.value
0240 return pro_ret
0241
0242 def process_trigger_processing(self, event=None, processing=None):
0243 self.number_workers += 1
0244 ret = self.process_trigger_processing_real(event=event, processing=processing)
0245 self.number_workers -= 1
0246 return ret
0247
0248 def process_msg_trigger_processing(self, event):
0249 self.number_msg_workers += 1
0250 ret = self.process_trigger_processing_real(event)
0251 self.number_msg_workers -= 1
0252 return ret
0253
0254 def init_event_function_map(self):
0255 self.event_func_map = {
0256 EventType.TriggerProcessing: {
0257 'pre_check': self.is_ok_to_run_more_processings,
0258 'exec_func': self.process_trigger_processing
0259 },
0260 EventType.MsgTriggerProcessing: {
0261 'pre_check': self.is_ok_to_run_more_msg_processings,
0262 'exec_func': self.process_msg_trigger_processing
0263 }
0264 }
0265
0266 def run(self):
0267 """
0268 Main run function.
0269 """
0270 try:
0271 self.logger.info("Starting main thread")
0272 self.init_thread_info()
0273
0274 self.load_plugins()
0275 self.init()
0276
0277 self.add_default_tasks()
0278
0279 self.init_event_function_map()
0280
0281 task = self.create_task(task_func=self.get_trigger_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0282 self.add_task(task)
0283
0284 task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0285 self.add_task(task)
0286
0287 self.execute()
0288 except KeyboardInterrupt:
0289 self.stop()
0290
0291
0292 if __name__ == '__main__':
0293 agent = Trigger()
0294 agent()