File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import traceback
0012
0013 from idds.common import exceptions
0014 from idds.common.constants import ProcessingType, ProcessingStatus, ProcessingLocking
0015 from idds.common.utils import setup_logging, truncate_string
0016 from idds.core import processings as core_processings
0017 from idds.agents.common.baseagent import BaseAgent
0018 from idds.agents.common.eventbus.event import (EventType,
0019 NewProcessingEvent,
0020 PreparedProcessingEvent,
0021 UpdateTransformEvent)
0022
0023 from .utils import handle_new_processing, handle_prepared_processing
0024 from .iutils import handle_new_iprocessing
0025 from .poller import Poller
0026
0027 setup_logging(__name__)
0028
0029
0030 class Submitter(Poller):
0031 """
0032 Submitter works to submit and running tasks to WFMS.
0033 """
0034
0035 def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
0036 name='Submitter', use_process_pool=False, message_bulk_size=1000, **kwargs):
0037 self.max_number_workers = max_number_workers
0038 self.set_max_workers()
0039 num_threads = self.max_number_workers
0040
0041 super(Submitter, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers,
0042 name=name, use_process_pool=use_process_pool,
0043 retrieve_bulk_size=retrieve_bulk_size, **kwargs)
0044 self.site_to_cloud = None
0045
0046 self._new_processing_status = None
0047 self._prepared_processing_status = None
0048
0049 def get_new_processings(self):
0050 """
0051 Get new processing
0052 """
0053 try:
0054 if not self.is_ok_to_run_more_processings():
0055 return []
0056
0057 self.show_queue_size()
0058
0059 if BaseAgent.min_request_id is None:
0060 return []
0061
0062 processing_status = [ProcessingStatus.New]
0063 self._new_processing_status = processing_status
0064 processings = core_processings.get_processings_by_status(status=processing_status, locking=True,
0065 new_poll=True,
0066 min_request_id=BaseAgent.min_request_id,
0067 bulk_size=self.get_bulk_size())
0068
0069
0070 if processings:
0071 processing_ids = [pr['processing_id'] for pr in processings]
0072 self.logger.info("Main thread get [new] processings to process: %s" % str(processing_ids))
0073
0074 for pr in processings:
0075 self.submit(self.process_new_processing, **{"processing": pr})
0076
0077 return processings
0078 except exceptions.DatabaseException as ex:
0079 if 'ORA-00060' in str(ex):
0080 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0081 else:
0082
0083 self.logger.error(ex)
0084 self.logger.error(traceback.format_exc())
0085 return []
0086
0087 def get_prepared_processings(self):
0088 """
0089 Get prepared processing
0090 """
0091 try:
0092 if not self.is_ok_to_run_more_processings():
0093 return []
0094
0095 self.show_queue_size()
0096
0097 if BaseAgent.min_request_id is None:
0098 return []
0099
0100 processing_status = [ProcessingStatus.Prepared]
0101 self._prepared_processing_status = processing_status
0102 processings = core_processings.get_processings_by_status(status=processing_status, locking=True,
0103 new_poll=True,
0104 min_request_id=BaseAgent.min_request_id,
0105 bulk_size=self.get_bulk_size())
0106
0107
0108 if processings:
0109 processing_ids = [pr['processing_id'] for pr in processings]
0110 self.logger.info("Main thread get [prepared] processings to process: %s" % str(processing_ids))
0111
0112 for pr in processings:
0113 self.submit(self.process_prepared_processing, **{"processing": pr})
0114
0115 return processings
0116 except exceptions.DatabaseException as ex:
0117 if 'ORA-00060' in str(ex):
0118 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0119 else:
0120
0121 self.logger.error(ex)
0122 self.logger.error(traceback.format_exc())
0123 return []
0124
0125 def get_site_to_cloud(self, site, log_prefix=''):
0126 try:
0127 if self.site_to_cloud is None:
0128 self.logger.debug(log_prefix + " agent_attributes: %s" % str(self.agent_attributes))
0129 self.site_to_cloud = {}
0130 if self.agent_attributes and 'domapandawork' in self.agent_attributes and self.agent_attributes['domapandawork']:
0131 if 'site_to_cloud' in self.agent_attributes['domapandawork'] and self.agent_attributes['domapandawork']['site_to_cloud']:
0132 site_to_clouds = self.agent_attributes['domapandawork']['site_to_cloud'].split(",")
0133 for site_to_cloud in site_to_clouds:
0134 local_site, cloud = site_to_cloud.split(':')
0135 if local_site not in self.site_to_cloud:
0136 self.site_to_cloud[local_site] = cloud
0137 self.logger.debug(log_prefix + " site_to_cloud: %s" % self.site_to_cloud)
0138
0139 if site and self.site_to_cloud:
0140 cloud = self.site_to_cloud.get(site, None)
0141 if cloud:
0142 self.logger.debug(log_prefix + "cloud for site(%s): %s" % (site, cloud))
0143 return cloud
0144 if 'default' in self.site_to_cloud:
0145 cloud = self.site_to_cloud.get('default', None)
0146 self.logger.debug(log_prefix + "cloud for default site(%s): %s" % (site, cloud))
0147 return cloud
0148 except Exception as ex:
0149 self.logger.error(ex)
0150 return None
0151
0152 def handle_new_processing(self, processing, check_previous=True):
0153 try:
0154 log_prefix = self.get_log_prefix(processing)
0155
0156
0157
0158
0159 executors = None
0160 if self.enable_executors:
0161 executors = self.get_extra_executors()
0162
0163 if check_previous:
0164 pre_works_are_ok = True
0165 if processing['parent_internal_id']:
0166 parent_internal_ids = processing['parent_internal_id'].split(",")
0167
0168 if processing['internal_id'] in parent_internal_ids:
0169 parent_internal_ids.remove(processing['internal_id'])
0170 if not parent_internal_ids:
0171 pre_works_are_ok = True
0172 else:
0173 prs = core_processings.get_processings(
0174 request_id=processing['request_id'],
0175 internal_ids=parent_internal_ids,
0176 loop_index=processing['loop_index']
0177 )
0178 if not prs:
0179 pre_works_are_ok = False
0180 else:
0181 all_pr_internal_ids = []
0182 for pr in prs:
0183 if pr['status'] not in [
0184 ProcessingStatus.Submitting,
0185 ProcessingStatus.Submitted,
0186 ProcessingStatus.Running,
0187 ProcessingStatus.Finished,
0188 ProcessingStatus.Failed,
0189 ProcessingStatus.FinishedOnStep,
0190 ProcessingStatus.FinishedOnExec,
0191 ProcessingStatus.FinishedTerm,
0192 ProcessingStatus.SubFinished,
0193 ProcessingStatus.Broken,
0194 ProcessingStatus.Terminating,
0195 ProcessingStatus.ToTrigger,
0196 ProcessingStatus.Triggering,
0197 ProcessingStatus.Synchronizing,
0198 ProcessingStatus.Prepared
0199 ]:
0200 pre_works_are_ok = False
0201 break
0202 all_pr_internal_ids.append(pr['internal_id'])
0203 if set(parent_internal_ids) != set(all_pr_internal_ids):
0204 pre_works_are_ok = False
0205 else:
0206 pre_works_are_ok = True
0207
0208 if pre_works_are_ok:
0209 ret_new_processing = handle_new_processing(processing,
0210 self.agent_attributes,
0211 func_site_to_cloud=self.get_site_to_cloud,
0212 max_updates_per_round=self.max_updates_per_round,
0213 executors=executors,
0214 logger=self.logger,
0215 log_prefix=log_prefix)
0216 status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
0217
0218 if not status:
0219 raise exceptions.ProcessSubmitFailed(str(errors))
0220
0221 parameters = {'status': ProcessingStatus.Prepared,
0222 'substatus': ProcessingStatus.Prepared,
0223 'locking': ProcessingLocking.Idle,
0224 'processing_metadata': processing['processing_metadata']}
0225 parameters = self.load_poll_period(processing, parameters, new=True)
0226
0227 processing['substatus'] = ProcessingStatus.Prepared
0228 update_processing = {'processing_id': processing['processing_id'],
0229 'parameters': parameters}
0230 ret = {'update_processing': update_processing,
0231 'update_collections': update_colls,
0232 'update_contents': [],
0233 'new_contents': new_contents,
0234 'new_input_dependency_contents': new_input_dependency_contents,
0235 'messages': msgs,
0236 }
0237 else:
0238 parameters = {'locking': ProcessingLocking.Idle}
0239 update_processing = {'processing_id': processing['processing_id'],
0240 'parameters': parameters}
0241 ret = {'update_processing': update_processing,
0242 'update_collections': [],
0243 'update_contents': [],
0244 'new_contents': [],
0245 'new_input_dependency_contents': [],
0246 'messages': [],
0247 }
0248 except Exception as ex:
0249 self.logger.error(ex)
0250 self.logger.error(traceback.format_exc())
0251
0252 retries = processing['new_retries'] + 1
0253 if not processing['max_new_retries'] or retries < processing['max_new_retries']:
0254 pr_status = processing['status']
0255 else:
0256 pr_status = ProcessingStatus.Failed
0257
0258 new_poll_period = int(processing['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0259 if new_poll_period > self.max_new_poll_period:
0260 new_poll_period = self.max_new_poll_period
0261
0262 error = {'process_new_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
0263 parameters = {'status': pr_status,
0264 'locking': ProcessingLocking.Idle,
0265 'new_poll_period': new_poll_period,
0266 'errors': processing['errors'] if processing['errors'] else {},
0267 'new_retries': retries}
0268 parameters['errors'].update(error)
0269
0270 update_processing = {'processing_id': processing['processing_id'],
0271 'parameters': parameters}
0272 ret = {'update_processing': update_processing,
0273 'update_contents': []}
0274 return ret
0275
0276 def handle_new_iprocessing(self, processing):
0277 try:
0278 log_prefix = self.get_log_prefix(processing)
0279
0280
0281
0282
0283 executors, plugin = None, None
0284 if processing['processing_type']:
0285 plugin_name = processing['processing_type'].name.lower() + '_submitter'
0286 plugin = self.get_plugin(plugin_name)
0287 else:
0288 raise exceptions.ProcessSubmitFailed('No corresponding submitter plugins for %s' % processing['processing_type'])
0289 ret_new_processing = handle_new_iprocessing(processing,
0290 self.agent_attributes,
0291 plugin=plugin,
0292 func_site_to_cloud=self.get_site_to_cloud,
0293 max_updates_per_round=self.max_updates_per_round,
0294 executors=executors,
0295 logger=self.logger,
0296 log_prefix=log_prefix)
0297 status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
0298
0299 if not status:
0300 raise exceptions.ProcessSubmitFailed(str(errors))
0301
0302 parameters = {'status': ProcessingStatus.Submitting,
0303 'substatus': ProcessingStatus.Submitting,
0304 'locking': ProcessingLocking.Idle,
0305 'processing_metadata': processing['processing_metadata']}
0306 parameters = self.load_poll_period(processing, parameters, new=True)
0307
0308 if 'submitted_at' in processing:
0309 parameters['submitted_at'] = processing['submitted_at']
0310
0311 if 'workload_id' in processing:
0312 parameters['workload_id'] = processing['workload_id']
0313
0314 update_processing = {'processing_id': processing['processing_id'],
0315 'parameters': parameters}
0316 ret = {'update_processing': update_processing,
0317 'update_collections': update_colls,
0318 'update_contents': [],
0319 'new_contents': new_contents,
0320 'new_input_dependency_contents': new_input_dependency_contents,
0321 'messages': msgs,
0322 }
0323 except Exception as ex:
0324 self.logger.error(ex)
0325 self.logger.error(traceback.format_exc())
0326
0327 retries = processing['new_retries'] + 1
0328 if not processing['max_new_retries'] or retries < processing['max_new_retries']:
0329 pr_status = processing['status']
0330 else:
0331 pr_status = ProcessingStatus.Failed
0332
0333 new_poll_period = int(processing['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0334 if new_poll_period > self.max_new_poll_period:
0335 new_poll_period = self.max_new_poll_period
0336
0337 error = {'submit_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
0338 parameters = {'status': pr_status,
0339 'locking': ProcessingLocking.Idle,
0340 'new_poll_period': new_poll_period,
0341 'errors': processing['errors'] if processing['errors'] else {},
0342 'new_retries': retries}
0343 parameters['errors'].update(error)
0344
0345 update_processing = {'processing_id': processing['processing_id'],
0346 'parameters': parameters}
0347 ret = {'update_processing': update_processing,
0348 'update_contents': []}
0349 return ret
0350
0351 def handle_prepared_processing(self, processing, check_previous=False):
0352 try:
0353 log_prefix = self.get_log_prefix(processing)
0354
0355
0356
0357
0358 executors = None
0359 if self.enable_executors:
0360 executors = self.get_extra_executors()
0361
0362 if check_previous:
0363 pre_works_are_ok = True
0364 if processing['parent_internal_id']:
0365 parent_internal_ids = processing['parent_internal_id'].split(",")
0366 prs = core_processings.get_processings(
0367 request_id=processing['request_id'],
0368 internal_ids=parent_internal_ids,
0369 loop_index=processing['loop_index']
0370 )
0371 if not prs:
0372 pre_works_are_ok = False
0373 else:
0374 all_pr_internal_ids = []
0375 for pr in prs:
0376 if not pr['workload_id']:
0377 pre_works_are_ok = False
0378 break
0379 all_pr_internal_ids.append(pr['internal_id'])
0380 if set(parent_internal_ids) != set(all_pr_internal_ids):
0381 pre_works_are_ok = False
0382 else:
0383 pre_works_are_ok = True
0384
0385 if pre_works_are_ok:
0386 ret_new_processing = handle_prepared_processing(processing,
0387 self.agent_attributes,
0388 func_site_to_cloud=self.get_site_to_cloud,
0389 max_updates_per_round=self.max_updates_per_round,
0390 executors=executors,
0391 logger=self.logger,
0392 log_prefix=log_prefix)
0393 status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
0394
0395 if not status:
0396 raise exceptions.ProcessSubmitFailed(str(errors))
0397
0398 parameters = {'status': ProcessingStatus.Submitting,
0399 'substatus': ProcessingStatus.Submitting,
0400 'locking': ProcessingLocking.Idle,
0401 'processing_metadata': processing['processing_metadata']}
0402 parameters = self.load_poll_period(processing, parameters, new=True)
0403
0404 proc = processing['processing_metadata']['processing']
0405 if proc.submitted_at:
0406 if not processing['submitted_at'] or processing['submitted_at'] < proc.submitted_at:
0407 parameters['submitted_at'] = proc.submitted_at
0408
0409
0410 if proc.workload_id and not processing['workload_id']:
0411 parameters['workload_id'] = proc.workload_id
0412
0413 update_processing = {'processing_id': processing['processing_id'],
0414 'parameters': parameters}
0415 ret = {'update_processing': update_processing,
0416 'update_collections': update_colls,
0417 'update_contents': [],
0418 'new_contents': new_contents,
0419 'new_input_dependency_contents': new_input_dependency_contents,
0420 'messages': msgs,
0421 }
0422 else:
0423 parameters = {'locking': ProcessingLocking.Idle}
0424 update_processing = {'processing_id': processing['processing_id'],
0425 'parameters': parameters}
0426 ret = {'update_processing': update_processing,
0427 'update_collections': [],
0428 'update_contents': [],
0429 'new_contents': [],
0430 'new_input_dependency_contents': [],
0431 'messages': [],
0432 }
0433 except Exception as ex:
0434 self.logger.error(ex)
0435 self.logger.error(traceback.format_exc())
0436
0437 retries = processing['new_retries'] + 1
0438 if not processing['max_new_retries'] or retries < processing['max_new_retries']:
0439 pr_status = processing['status']
0440 else:
0441 pr_status = ProcessingStatus.Failed
0442
0443 new_poll_period = int(processing['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0444 if new_poll_period < self.new_fail_poll_period:
0445 new_poll_period = self.new_fail_poll_period
0446 if new_poll_period > self.max_new_poll_period:
0447 new_poll_period = self.max_new_poll_period
0448
0449 error = {'submit_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
0450 parameters = {'status': pr_status,
0451 'locking': ProcessingLocking.Idle,
0452 'new_poll_period': new_poll_period,
0453 'errors': processing['errors'] if processing['errors'] else {},
0454 'new_retries': retries}
0455 parameters['errors'].update(error)
0456
0457 update_processing = {'processing_id': processing['processing_id'],
0458 'parameters': parameters}
0459 ret = {'update_processing': update_processing,
0460 'update_contents': []}
0461 return ret
0462
0463 def process_new_processing(self, event=None, processing=None):
0464 self.number_workers += 1
0465 try:
0466 if processing is None and event:
0467
0468 self.logger.info("process_new_processing, event: %s" % str(event))
0469 pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
0470 if not pr:
0471 self.logger.warn("Cannot find processing for event: %s" % str(event))
0472 elif self._new_processing_status and pr['status'] not in self._new_processing_status:
0473 parameters = {'locking': ProcessingLocking.Idle}
0474 update_processing = {'processing_id': pr['processing_id'],
0475 'parameters': parameters}
0476 ret = {'update_processing': update_processing,
0477 'update_contents': []}
0478 self.update_processing(ret, pr)
0479 else:
0480 processing = pr
0481 if processing:
0482 pr = processing
0483 log_pre = self.get_log_prefix(pr)
0484 self.logger.info(log_pre + "process_new_processing")
0485 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0486 ret = self.handle_new_iprocessing(pr)
0487
0488
0489 self.update_processing(ret, pr)
0490
0491 self.logger.info(log_pre + "PreparedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0492 event = PreparedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0493 event.set_has_updates()
0494 self.event_bus.send(event)
0495 else:
0496 ret = self.handle_new_processing(pr)
0497 self.update_processing(ret, pr)
0498 if processing['substatus'] in [ProcessingStatus.Prepared]:
0499 self.logger.info(log_pre + "PreparedProcessingEvent(processing_id: %s)" % pr['processing_id'])
0500 event = PreparedProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'])
0501 event.set_has_updates()
0502 self.event_bus.send(event)
0503
0504
0505 following_prs = core_processings.get_processings(
0506 request_id=processing['request_id'],
0507 parent_internal_ids=[processing['internal_id']],
0508 loop_index=processing['loop_index']
0509 )
0510 if following_prs:
0511 for following_pr in following_prs:
0512 self.logger.info(log_pre + "NewProcessingEvent(processing_id: %s)" % following_pr['processing_id'])
0513 event = NewProcessingEvent(publisher_id=self.id, processing_id=following_pr['processing_id'])
0514 self.event_bus.send(event)
0515
0516 except Exception as ex:
0517 self.logger.error(ex)
0518 self.logger.error(traceback.format_exc())
0519 self.number_workers -= 1
0520
0521 def process_prepared_processing(self, event=None, processing=None):
0522 self.number_workers += 1
0523 try:
0524 if processing is None and event:
0525
0526 self.logger.info("process_prepared_processing, event: %s" % str(event))
0527 pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
0528 if not pr:
0529 self.logger.warn("Cannot find processing for event: %s" % str(event))
0530 elif self._prepared_processing_status and pr['status'] not in self._prepared_processing_status:
0531 parameters = {'locking': ProcessingLocking.Idle}
0532 update_processing = {'processing_id': pr['processing_id'],
0533 'parameters': parameters}
0534 ret = {'update_processing': update_processing,
0535 'update_contents': []}
0536 self.update_processing(ret, pr)
0537 else:
0538 processing = pr
0539 if processing:
0540 pr = processing
0541 log_pre = self.get_log_prefix(pr)
0542 self.logger.info(log_pre + "process_prepared_processing")
0543 if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
0544
0545 self.logger.error(log_pre + "process_prepared_processing, iWorkflow/iWork should not have status prepared")
0546 parameters = {'locking': ProcessingLocking.Idle}
0547 update_processing = {'processing_id': pr['processing_id'],
0548 'parameters': parameters}
0549 ret = {'update_processing': update_processing,
0550 'update_contents': []}
0551 self.update_processing(ret, pr)
0552 else:
0553 ret = self.handle_prepared_processing(pr)
0554
0555
0556 self.update_processing(ret, pr)
0557
0558 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
0559 submit_event_content = {'event': 'submitted'}
0560 event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=submit_event_content)
0561 event.set_has_updates()
0562 self.event_bus.send(event)
0563
0564
0565
0566
0567
0568 except Exception as ex:
0569 self.logger.error(ex)
0570 self.logger.error(traceback.format_exc())
0571 self.number_workers -= 1
0572
0573 def init_event_function_map(self):
0574 self.event_func_map = {
0575 EventType.NewProcessing: {
0576 'pre_check': self.is_ok_to_run_more_processings,
0577 'exec_func': self.process_new_processing
0578 },
0579 EventType.PreparedProcessing: {
0580 'pre_check': self.is_ok_to_run_more_processings,
0581 'exec_func': self.process_prepared_processing
0582 }
0583 }
0584
0585 def run(self):
0586 """
0587 Main run function.
0588 """
0589 try:
0590 self.logger.info("Starting main thread")
0591 self.init_thread_info()
0592
0593 self.load_plugins()
0594 self.init()
0595
0596 self.add_default_tasks()
0597
0598 self.init_event_function_map()
0599
0600 task = self.create_task(task_func=self.get_new_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0601 self.add_task(task)
0602
0603 task = self.create_task(task_func=self.get_prepared_processings, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
0604 self.add_task(task)
0605
0606 task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0607 self.add_task(task)
0608
0609 self.execute()
0610 except KeyboardInterrupt:
0611 self.stop()
0612
0613
0614 if __name__ == '__main__':
0615 agent = Submitter()
0616 agent()