File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import traceback
0012 try:
0013
0014 from queue import Queue
0015 except ImportError:
0016
0017 from Queue import Queue
0018
0019
0020 from idds.common.constants import (Sections, WorkprogressStatus, WorkprogressLocking, TransformStatus)
0021 from idds.common.utils import setup_logging
0022 from idds.core import (workprogress as core_workprogress, transforms as core_transforms)
0023 from idds.workflow.work import WorkStatus
0024 from idds.agents.common.baseagent import BaseAgent
0025
0026 setup_logging(__name__)
0027
0028
0029 class Marshaller(BaseAgent):
0030 """
0031 Marshaller works to organize workflow, activate the current works in a workflow to workprogresses..
0032 """
0033
0034 def __init__(self, num_threads=1, poll_time_period=1800, retrieve_bulk_size=10, **kwargs):
0035 super(Marshaller, self).__init__(num_threads=num_threads, **kwargs)
0036 self.config_section = Sections.Marshaller
0037 self.poll_time_period = int(poll_time_period)
0038 self.retrieve_bulk_size = int(retrieve_bulk_size)
0039
0040 self.new_task_queue = Queue()
0041 self.new_output_queue = Queue()
0042 self.running_task_queue = Queue()
0043 self.running_output_queue = Queue()
0044
0045 def get_new_workprogresses(self):
0046 """
0047 Get new workprogress to process
0048 """
0049
0050 workprogress_status = [WorkprogressStatus.New, WorkprogressStatus.Ready, WorkprogressStatus.Extend]
0051 workprogresses_new = core_workprogress.get_workprogresses_by_status(status=workprogress_status,
0052 locking=True,
0053 bulk_size=self.retrieve_bulk_size)
0054
0055 self.logger.debug("Main thread get %s New+Ready+Extend workprogresses to process" % len(workprogresses_new))
0056 if workprogresses_new:
0057 self.logger.info("Main thread get %s New+Ready+Extend workprogresses to process" % len(workprogresses_new))
0058 return workprogresses_new
0059
0060 def process_new_workprogress(self, workprogress):
0061 """
0062 Process new workprogress
0063 """
0064 wf = workprogress['workprogress_metadata']['workflow']
0065 works = wf.get_new_works()
0066
0067 transforms = []
0068 for work in works:
0069 new_work = work.copy()
0070 new_work.add_proxy(wf.get_proxy())
0071 transform = {'workprogress_id': workprogress['workprogress_id'],
0072 'request_id': workprogress['request_id'],
0073 'workload_id': workprogress['workload_id'],
0074 'transform_type': work.get_work_type(),
0075 'transform_tag': work.get_work_tag(),
0076 'priority': workprogress['priority'],
0077 'status': TransformStatus.New,
0078 'retries': 0,
0079 'expired_at': workprogress['expired_at'],
0080 'transform_metadata': {'orginal_work': work, 'work': new_work}
0081
0082 }
0083 transforms.append(transform)
0084
0085 self.logger.info("Processing workprogress(%s): new transforms: %s" % (workprogress['workprogress_id'],
0086 transforms))
0087
0088 workprogress['locking'] = WorkprogressLocking.Idle
0089 workprogress['status'] = WorkprogressStatus.Transforming
0090 ret_wp = {'workprogress': workprogress, 'new_transforms': transforms}
0091 return ret_wp
0092
0093 def process_new_workprogresses(self):
0094 ret = []
0095 while not self.new_task_queue.empty():
0096 try:
0097 workprogress = self.new_task_queue.get()
0098 if workprogress:
0099 self.logger.info("Main thread processing new workprogress: %s" % workprogress)
0100 ret_workprogress = self.process_new_workprogress(workprogress)
0101 if ret_workprogress:
0102 ret.append(ret_workprogress)
0103 except Exception as ex:
0104 self.logger.error(ex)
0105 self.logger.error(traceback.format_exc())
0106 return ret
0107
0108 def finish_new_workprogresses(self):
0109 while not self.new_output_queue.empty():
0110 try:
0111 ret = self.new_output_queue.get()
0112 self.logger.info("Main thread finishing new workprogress: %s" % ret['workprogress'])
0113 if ret:
0114 wp = ret['workprogress']
0115 tfs = ret['new_transforms']
0116 wp_parameters = {'status': wp['status'],
0117 'locking': wp['locking'],
0118 'workprogress_metadata': wp['workprogress_metadata']}
0119 core_workprogress.update_workprogress(workprogress_id=wp['workprogress_id'],
0120 parameters=wp_parameters,
0121 new_transforms=tfs)
0122 except Exception as ex:
0123 self.logger.error(ex)
0124 self.logger.error(traceback.format_exc())
0125
0126 def get_running_workprogresses(self):
0127 """
0128 Get workprogresses to running
0129 """
0130 workprogress_status = [WorkprogressStatus.Transforming, WorkprogressStatus.ToCancel,
0131 WorkprogressStatus.Cancelling]
0132 workprogresses = core_workprogress.get_workprogresses_by_status(status=workprogress_status,
0133 period=self.poll_time_period,
0134 locking=True,
0135 bulk_size=self.retrieve_bulk_size)
0136
0137 self.logger.debug("Main thread get %s progressing workprogresses to process" % len(workprogresses))
0138 if workprogresses:
0139 self.logger.info("Main thread get %s progressing workprogresses to process" % len(workprogresses))
0140 return workprogresses
0141
0142 def process_running_workprogress(self, workprogress):
0143 """
0144 process running workprogresses
0145 """
0146 self.logger.info("process_running_workprogress: workprogress_id: %s" % workprogress['workprogress_id'])
0147 workprogress_metadata = workprogress['workprogress_metadata']
0148 wf = workprogress_metadata['workflow']
0149
0150 new_transforms = []
0151 if workprogress['status'] in [WorkprogressStatus.Transforming]:
0152
0153 works = wf.get_new_works()
0154 for work in works:
0155 new_work = work.copy()
0156 new_work.add_proxy(wf.get_proxy())
0157 new_transform = {'workprogress_id': workprogress['workprogress_id'],
0158 'request_id': workprogress['request_id'],
0159 'workload_id': workprogress['workload_id'],
0160 'transform_type': work.get_work_type(),
0161 'transform_tag': work.get_work_tag(),
0162 'priority': workprogress['priority'],
0163 'status': TransformStatus.New,
0164 'retries': 0,
0165 'expired_at': workprogress['expired_at'],
0166 'transform_metadata': {'orginal_work': work, 'work': new_work}
0167
0168 }
0169 new_transforms.append(new_transform)
0170 self.logger.info("Processing workprogress(%s): new transforms: %s" % (workprogress['workprogress_id'],
0171 new_transforms))
0172
0173 update_transforms = {}
0174 if workprogress['status'] in [WorkprogressStatus.ToCancel]:
0175
0176 works = wf.get_current_works()
0177
0178 for work in works:
0179 if work.get_status() not in [WorkStatus.Finished, WorkStatus.SubFinished,
0180 WorkStatus.Failed, WorkStatus.Cancelling,
0181 WorkStatus.Cancelled]:
0182 update_transforms[work.get_work_id()] = {'status': TransformStatus.ToCancel}
0183
0184
0185 works = wf.get_current_works()
0186
0187 for work in works:
0188
0189 tf = core_transforms.get_transform(transform_id=work.get_work_id())
0190 work_status = WorkStatus(tf['status'].value)
0191 work.set_status(work_status)
0192 work.set_terminated_msg(msg=None)
0193
0194 if wf.is_terminated():
0195 if wf.is_finished():
0196 wp_status = WorkprogressStatus.Finished
0197 elif wf.is_subfinished():
0198 wp_status = WorkprogressStatus.SubFinished
0199 elif wf.is_failed():
0200 wp_status = WorkprogressStatus.Failed
0201 elif wf.is_cancelled():
0202 wp_status = WorkprogressStatus.Cancelled
0203 else:
0204 wp_status = WorkprogressStatus.Failed
0205 wp_msg = wf.get_terminated_msg()
0206 else:
0207 wp_status = WorkprogressStatus.Transforming
0208 wp_msg = None
0209 parameters = {'status': wp_status,
0210 'locking': WorkprogressLocking.Idle,
0211 'workprogress_metadata': workprogress_metadata,
0212 'errors': {'msg': wp_msg}}
0213 ret = {'workprogress_id': workprogress['workprogress_id'],
0214 'parameters': parameters,
0215 'new_transforms': new_transforms,
0216 'update_transforms': update_transforms}
0217 return ret
0218
0219 def process_running_workprogresses(self):
0220 ret = []
0221 while not self.running_task_queue.empty():
0222 try:
0223 workprogress = self.running_task_queue.get()
0224 if workprogress:
0225 self.logger.info("Main thread processing running workprogress: %s" % workprogress)
0226 ret_workprogress = self.process_running_workprogress(workprogress)
0227 if ret_workprogress:
0228 ret.append(ret_workprogress)
0229 except Exception as ex:
0230 self.logger.error(ex)
0231 self.logger.error(traceback.format_exc())
0232 return ret
0233
0234 def finish_running_workprogresses(self):
0235 while not self.running_output_queue.empty():
0236 try:
0237 ret = self.running_output_queue.get()
0238 self.logger.info("Main thread finishing processing workprogress: %s" % ret)
0239
0240 wp_id = ret['workprogress_id']
0241 parameters = ret['parameters']
0242 new_transforms = ret['new_transforms']
0243 update_transforms = ret['update_transforms']
0244 core_workprogress.update_workprogress(workprogress_id=wp_id,
0245 parameters=parameters,
0246 new_transforms=new_transforms,
0247 update_transforms=update_transforms)
0248 except Exception as ex:
0249 self.logger.error(ex)
0250 self.logger.error(traceback.format_exc())
0251
0252 def clean_locks(self):
0253 self.logger.info("clean locking")
0254 core_workprogress.clean_locking()
0255
0256 def run(self):
0257 """
0258 Main run function.
0259 """
0260 try:
0261 self.logger.info("Starting main thread")
0262
0263 self.load_plugins()
0264
0265 self.add_default_tasks()
0266
0267 task = self.create_task(task_func=self.get_new_workprogresses, task_output_queue=self.new_task_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0268 self.add_task(task)
0269 for _ in range(self.num_threads):
0270 task = self.create_task(task_func=self.process_new_workprogresses, task_output_queue=self.new_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0271 self.add_task(task)
0272 task = self.create_task(task_func=self.finish_new_workprogresses, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=2, priority=1)
0273 self.add_task(task)
0274
0275 task = self.create_task(task_func=self.get_running_workprogresses, task_output_queue=self.running_task_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0276 self.add_task(task)
0277 for _ in range(self.num_threads):
0278 task = self.create_task(task_func=self.process_running_workprogresses, task_output_queue=self.running_output_queue, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0279 self.add_task(task)
0280 task = self.create_task(task_func=self.finish_running_workprogresses, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1, priority=1)
0281 self.add_task(task)
0282
0283 task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
0284 self.add_task(task)
0285
0286 self.execute()
0287 except KeyboardInterrupt:
0288 self.stop()
0289
0290
0291 if __name__ == '__main__':
0292 agent = Marshaller()
0293 agent()