File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import datetime
0012 import random
0013 import time
0014 import traceback
0015
0016 from idds.common import exceptions
0017 from idds.common.constants import (Sections, ReturnCode,
0018 RequestType, RequestStatus, RequestLocking,
0019 TransformType, WorkflowType, ConditionStatus,
0020 TransformStatus, ProcessingStatus,
0021 ContentStatus, ContentRelationType,
0022 CommandType, CommandStatus, CommandLocking)
0023 from idds.common.utils import setup_logging, truncate_string, str_to_date
0024 from idds.core import (requests as core_requests,
0025 transforms as core_transforms,
0026 processings as core_processings,
0027 catalog as core_catalog,
0028 throttlers as core_throttlers,
0029 commands as core_commands)
0030 from idds.agents.common.baseagent import BaseAgent
0031 from idds.agents.common.eventbus.event import (EventType,
0032
0033 UpdateRequestEvent,
0034
0035
0036
0037
0038 QueueTransformEvent,
0039 UpdateTransformEvent,
0040 AbortTransformEvent,
0041 ResumeTransformEvent,
0042 ExpireRequestEvent)
0043
0044 from idds.agents.common.cache.redis import get_redis_cache
0045
0046
0047 setup_logging(__name__)
0048
0049
0050 class Clerk(BaseAgent):
0051 """
0052 Clerk works to process requests and converts requests to transforms.
0053 """
0054
0055 def __init__(self, num_threads=1, max_number_workers=8, poll_period=10, retrieve_bulk_size=3, cache_expire_seconds=300, pending_time=None, **kwargs):
0056 self.max_number_workers = max_number_workers
0057 self.set_max_workers()
0058 num_threads = self.max_number_workers
0059 super(Clerk, self).__init__(num_threads=num_threads, name='Clerk', **kwargs)
0060 self.poll_period = int(poll_period)
0061 self.retrieve_bulk_size = int(retrieve_bulk_size)
0062 self.config_section = Sections.Clerk
0063 self.start_at = time.time()
0064
0065 if pending_time:
0066 self.pending_time = float(pending_time)
0067 else:
0068 self.pending_time = None
0069
0070 self.cache_expire_seconds = int(cache_expire_seconds)
0071
0072 if not hasattr(self, 'release_helper') or not self.release_helper:
0073 self.release_helper = False
0074 elif str(self.release_helper).lower() == 'true':
0075 self.release_helper = True
0076 else:
0077 self.release_helper = False
0078
0079 if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0080 self.new_poll_period = self.poll_period
0081 else:
0082 self.new_poll_period = int(self.new_poll_period)
0083 if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0084 self.update_poll_period = self.poll_period
0085 else:
0086 self.update_poll_period = int(self.update_poll_period)
0087 if not hasattr(self, 'throttle_poll_period') or not self.throttle_poll_period:
0088 self.throttle_poll_period = self.poll_period
0089 else:
0090 self.throttle_poll_period = int(self.new_poll_period)
0091
0092 if hasattr(self, 'poll_period_increase_rate'):
0093 self.poll_period_increase_rate = float(self.poll_period_increase_rate)
0094 else:
0095 self.poll_period_increase_rate = 2
0096
0097 if hasattr(self, 'max_new_poll_period'):
0098 self.max_new_poll_period = int(self.max_new_poll_period)
0099 else:
0100 self.max_new_poll_period = 3600 * 6
0101 if hasattr(self, 'max_update_poll_period'):
0102 self.max_update_poll_period = int(self.max_update_poll_period)
0103 else:
0104 self.max_update_poll_period = 3600 * 6
0105
0106 if not hasattr(self, 'new_command_poll_period') or not self.new_command_poll_period:
0107 self.new_command_poll_period = 1
0108 else:
0109 self.new_command_poll_period = int(self.new_command_poll_period)
0110 if not hasattr(self, 'update_command_poll_period') or not self.update_command_poll_period:
0111 self.update_command_poll_period = self.poll_period
0112 else:
0113 self.update_command_poll_period = int(self.update_command_poll_period)
0114
0115 if hasattr(self, 'max_new_retries'):
0116 self.max_new_retries = int(self.max_new_retries)
0117 else:
0118 self.max_new_retries = 3
0119 if hasattr(self, 'max_update_retries'):
0120 self.max_update_retries = int(self.max_update_retries)
0121 else:
0122
0123 self.max_update_retries = 0
0124
0125 self.number_workers = 0
0126 if not hasattr(self, 'max_number_workers') or not self.max_number_workers:
0127 self.max_number_workers = 3
0128 else:
0129 self.max_number_workers = int(self.max_number_workers)
0130
0131 self.show_queue_size_time = None
0132
0133 if hasattr(self, 'clean_locks_time_period'):
0134 self.clean_locks_time_period = int(self.clean_locks_time_period)
0135 else:
0136 self.clean_locks_time_period = 1800
0137
0138 def is_ok_to_run_more_requests(self):
0139 if self.get_num_free_workers() > 0:
0140 return True
0141 return False
0142
0143 def show_queue_size(self):
0144 if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0145 self.show_queue_size_time = time.time()
0146 q_str = "min request_id: %s, number of requests: %s, max number of requests: %s" % (BaseAgent.min_request_id,
0147 self.get_num_workers(),
0148 self.get_max_workers())
0149 self.logger.debug(q_str)
0150
0151 def get_bulk_size(self):
0152 return min(self.retrieve_bulk_size, self.get_num_free_workers())
0153
0154 def get_new_requests(self):
0155 """
0156 Get new requests to process
0157 """
0158 try:
0159
0160
0161
0162
0163 if not self.is_ok_to_run_more_requests():
0164 return []
0165
0166 self.show_queue_size()
0167
0168 if time.time() < self.start_at + 3600:
0169 if BaseAgent.poll_new_min_request_id_times % 30 == 0:
0170
0171 if BaseAgent.min_request_id:
0172 min_request_id = BaseAgent.min_request_id - 1000
0173 else:
0174 min_request_id = None
0175 else:
0176 min_request_id = BaseAgent.min_request_id
0177 else:
0178 if BaseAgent.poll_new_min_request_id_times % 180 == 0:
0179
0180 if BaseAgent.min_request_id:
0181 min_request_id = BaseAgent.min_request_id - 1000
0182 else:
0183 min_request_id = None
0184 else:
0185 min_request_id = BaseAgent.min_request_id
0186
0187 BaseAgent.poll_new_min_request_id_times += 1
0188
0189 req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling]
0190 reqs_new = core_requests.get_requests_by_status_type(status=req_status, locking=True,
0191 min_request_id=min_request_id,
0192 bulk_size=self.get_bulk_size(),
0193 new_poll=True, only_return_id=False)
0194
0195
0196 if reqs_new:
0197 req_ids = [req["request_id"] for req in reqs_new]
0198 self.logger.info("Main thread get [New+Extend] requests to process: %s" % str(req_ids))
0199
0200 for req in reqs_new:
0201 req_id = req["request_id"]
0202 self.submit(self.process_new_request, **{"request": req})
0203 BaseAgent.min_request_id_cache[req_id] = time.time()
0204 if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
0205 BaseAgent.min_request_id = req_id
0206 self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
0207 core_requests.set_min_request_id(BaseAgent.min_request_id)
0208 return reqs_new
0209 except exceptions.DatabaseException as ex:
0210 if 'ORA-00060' in str(ex):
0211 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0212 else:
0213
0214 self.logger.error(ex)
0215 self.logger.error(traceback.format_exc())
0216 return []
0217
0218 def get_running_requests(self):
0219 """
0220 Get running requests
0221 """
0222 try:
0223 if not self.is_ok_to_run_more_requests():
0224 return []
0225
0226 self.show_queue_size()
0227
0228 if time.time() < self.start_at + 3600:
0229 if BaseAgent.poll_running_min_request_id_times % 30 == 0:
0230
0231 if BaseAgent.min_request_id:
0232 min_request_id = BaseAgent.min_request_id - 1000
0233 else:
0234 min_request_id = None
0235 else:
0236 min_request_id = BaseAgent.min_request_id
0237 else:
0238 if BaseAgent.poll_running_min_request_id_times % 180 == 0:
0239
0240 if BaseAgent.min_request_id:
0241 min_request_id = BaseAgent.min_request_id - 1000
0242 else:
0243 min_request_id = None
0244 else:
0245 min_request_id = BaseAgent.min_request_id
0246
0247 BaseAgent.poll_running_min_request_id_times += 1
0248
0249 req_status = [RequestStatus.Transforming, RequestStatus.ToCancel, RequestStatus.Cancelling,
0250 RequestStatus.ToSuspend, RequestStatus.Suspending,
0251 RequestStatus.ToExpire, RequestStatus.Expiring,
0252 RequestStatus.ToFinish, RequestStatus.ToForceFinish,
0253 RequestStatus.ToResume, RequestStatus.Resuming,
0254 RequestStatus.Building, RequestStatus.ToClose]
0255 reqs = core_requests.get_requests_by_status_type(status=req_status, time_period=None,
0256 min_request_id=min_request_id,
0257 locking=True,
0258 bulk_size=self.get_bulk_size(),
0259 update_poll=True, only_return_id=False)
0260
0261
0262 if reqs:
0263 req_ids = [req["request_id"] for req in reqs]
0264 self.logger.info("Main thread get Transforming requests to running: %s" % str(req_ids))
0265
0266 for req in reqs:
0267 req_id = req["request_id"]
0268 self.submit(self.process_update_request, **{"request": req})
0269
0270 BaseAgent.min_request_id_cache[req_id] = time.time()
0271 if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
0272 BaseAgent.min_request_id = req_id
0273 self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
0274 core_requests.set_min_request_id(BaseAgent.min_request_id)
0275
0276 return reqs
0277 except exceptions.DatabaseException as ex:
0278 if 'ORA-00060' in str(ex):
0279 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0280 else:
0281
0282 self.logger.error(ex)
0283 self.logger.error(traceback.format_exc())
0284 return []
0285
0286 def get_operation_requests(self):
0287 """
0288 Get running requests
0289 """
0290 try:
0291 if not self.is_ok_to_run_more_requests():
0292 return []
0293
0294 self.show_queue_size()
0295
0296 status = [CommandStatus.New]
0297 new_commands = core_commands.get_commands_by_status(status=status, locking=True, period=self.new_command_poll_period)
0298 status = [CommandStatus.Processing]
0299 processing_commands = core_commands.get_commands_by_status(status=status, locking=True,
0300 period=self.update_command_poll_period)
0301 commands = new_commands + processing_commands
0302
0303
0304 if commands:
0305 self.logger.info("Main thread get %s commands" % len(commands))
0306
0307 update_commands = []
0308 for cmd in commands:
0309 request_id = cmd['request_id']
0310
0311 cmd_type = cmd['cmd_type']
0312 cmd_status = cmd['status']
0313
0314 if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id:
0315 BaseAgent.min_request_id = request_id
0316 self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
0317 BaseAgent.min_request_id_cache[request_id] = time.time()
0318 core_requests.set_min_request_id(BaseAgent.min_request_id)
0319
0320 if cmd_status in [CommandStatus.New, CommandStatus.Processing]:
0321 req = self.get_request(request_id, status=None, locking=True)
0322 if req:
0323 if cmd_type in [CommandType.AbortRequest]:
0324 self.submit(self.process_abort_request, **{"request": req, "command": cmd["cmd_id"]})
0325 elif cmd_type in [CommandType.ResumeRequest]:
0326 self.submit(self.process_resume_request, **{"request": req, "command": cmd["cmd_id"]})
0327 elif cmd_type in [CommandType.CloseRequest]:
0328 self.submit(self.process_close_request, **{"request": req, "command": cmd["cmd_id"]})
0329
0330 u_command = {'cmd_id': cmd['cmd_id'],
0331 'status': CommandStatus.Processing,
0332 'locking': CommandLocking.Idle}
0333 update_commands.append(u_command)
0334 else:
0335 u_command = {'cmd_id': cmd['cmd_id'],
0336 'status': CommandStatus.UnknownCommand,
0337 'locking': CommandLocking.Idle}
0338 update_commands.append(u_command)
0339 core_commands.update_commands(update_commands)
0340 return commands
0341 except exceptions.DatabaseException as ex:
0342 if 'ORA-00060' in str(ex):
0343 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0344 else:
0345
0346 self.logger.error(ex)
0347 self.logger.error(traceback.format_exc())
0348 return []
0349
0350 def clean_min_request_id(self):
0351 try:
0352 if BaseAgent.checking_min_request_id_times <= 0:
0353 old_min_request_id = core_requests.get_min_request_id()
0354 self.logger.info("old_min_request_id: %s" % old_min_request_id)
0355 if not old_min_request_id:
0356 min_request_id = 0
0357 else:
0358 min_request_id = old_min_request_id - 1000
0359 BaseAgent.min_request_id = min_request_id
0360 else:
0361 for req_id in list(BaseAgent.min_request_id_cache.keys()):
0362 time_stamp = BaseAgent.min_request_id_cache[req_id]
0363 if time_stamp < time.time() - 12 * 3600:
0364 del BaseAgent.min_request_id_cache[req_id]
0365
0366 if BaseAgent.min_request_id_cache:
0367 min_request_id = min(list(BaseAgent.min_request_id_cache.keys()))
0368 BaseAgent.min_request_id = min_request_id
0369 core_requests.set_min_request_id(BaseAgent.min_request_id)
0370
0371 BaseAgent.checking_min_request_id_times += 1
0372 except Exception as ex:
0373 self.logger.error(ex)
0374 self.logger.error(traceback.format_exc())
0375
0376 def get_request(self, request_id, status=None, locking=False):
0377 try:
0378 return core_requests.get_request_by_id_status(request_id=request_id, status=status, locking=locking)
0379 except exceptions.DatabaseException as ex:
0380 if 'ORA-00060' in str(ex):
0381 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0382 else:
0383
0384 self.logger.error(ex)
0385 self.logger.error(traceback.format_exc())
0386 return None
0387
0388 def load_poll_period(self, req, parameters, throttling=False):
0389 if self.new_poll_period and req['new_poll_period'] != self.new_poll_period:
0390 parameters['new_poll_period'] = self.new_poll_period
0391 if throttling:
0392 parameters['new_poll_period'] = self.throttle_poll_period
0393 if self.update_poll_period and req['update_poll_period'] != self.update_poll_period:
0394 parameters['update_poll_period'] = self.update_poll_period
0395 parameters['max_new_retries'] = req['max_new_retries'] if req['max_new_retries'] is not None else self.max_new_retries
0396 parameters['max_update_retries'] = req['max_update_retries'] if req['max_update_retries'] is not None else self.max_update_retries
0397 return parameters
0398
0399 def get_work_tag_attribute(self, work_tag, attribute):
0400 work_tag_attribute_value = None
0401 if work_tag:
0402 work_tag_attribute = work_tag + "_" + attribute
0403 if hasattr(self, work_tag_attribute):
0404 work_tag_attribute_value = int(getattr(self, work_tag_attribute))
0405 return work_tag_attribute_value
0406
0407 def generate_transform(self, req, work, build=False, iworkflow=False):
0408 if iworkflow:
0409 wf = None
0410 else:
0411 if build:
0412 wf = req['request_metadata']['build_workflow']
0413 else:
0414 wf = req['request_metadata']['workflow']
0415
0416 work.set_request_id(req['request_id'])
0417 work.username = req['username']
0418
0419 transform_tag = work.get_work_tag()
0420 if req['max_new_retries']:
0421 max_new_retries = req['max_new_retries']
0422 else:
0423 work_tag_max_new_retries = self.get_work_tag_attribute(transform_tag, "max_new_retries")
0424 if work_tag_max_new_retries:
0425 max_new_retries = work_tag_max_new_retries
0426 else:
0427 max_new_retries = self.max_new_retries
0428
0429 if req['max_update_retries']:
0430 max_update_retries = req['max_update_retries']
0431 else:
0432 work_tag_max_update_retries = self.get_work_tag_attribute(transform_tag, "max_update_retries")
0433 if work_tag_max_update_retries:
0434 max_update_retries = work_tag_max_update_retries
0435 else:
0436 max_update_retries = self.max_update_retries
0437
0438 transform_type = TransformType.Workflow
0439 try:
0440 work_type = work.get_work_type()
0441 if build:
0442
0443
0444 pass
0445 elif work_type in [WorkflowType.iWorkflowLocal]:
0446
0447 return None
0448 elif work_type in [WorkflowType.iWorkflow]:
0449 transform_type = TransformType.iWorkflow
0450 elif work_type in [WorkflowType.iWork]:
0451 transform_type = TransformType.iWork
0452 elif work_type in [WorkflowType.GenericWorkflow]:
0453 transform_type = TransformType.GenericWorkflow
0454 elif work_type in [WorkflowType.GenericWork]:
0455 transform_type = TransformType.GenericWork
0456 except Exception:
0457 pass
0458
0459 has_previous_conditions = None
0460 try:
0461 if hasattr(work, 'get_previous_conditions'):
0462 work_previous_conditions = work.get_previous_conditions()
0463 if work_previous_conditions:
0464 has_previous_conditions = len(work_previous_conditions)
0465 except Exception:
0466 pass
0467
0468 triggered_conditions = []
0469 untriggered_conditions = []
0470 try:
0471 if hasattr(work, 'get_following_conditions'):
0472 following_conditions = work.get_following_conditions()
0473 for cond in following_conditions:
0474 untriggered_conditions.append(cond)
0475 except Exception:
0476 pass
0477
0478 loop_index = None
0479 try:
0480 if hasattr(work, 'get_loop_index'):
0481 loop_index = work.get_loop_index()
0482 except Exception:
0483 pass
0484
0485
0486 transform_status = TransformStatus.Queue
0487 if has_previous_conditions:
0488 transform_status = TransformStatus.WaitForTrigger
0489
0490 site = req['site']
0491 if not site:
0492 try:
0493 cloud = None
0494 if hasattr(work, 'task_cloud') and work.task_cloud:
0495 cloud = work.task_cloud
0496
0497 if hasattr(work, 'task_queue') and work.task_queue:
0498 queue = work.task_queue
0499 elif hasattr(work, 'queue') and work.queue:
0500 queue = work.queue
0501 else:
0502 queue = None
0503
0504 task_site = None
0505 if hasattr(work, 'task_site') and work.task_site:
0506 task_site = work.task_site
0507 site = f"{cloud},{task_site},{queue}"
0508 except Exception:
0509 pass
0510
0511 new_transform = {'request_id': req['request_id'],
0512 'workload_id': req['workload_id'],
0513 'transform_type': transform_type,
0514 'transform_tag': work.get_work_tag(),
0515 'priority': req['priority'],
0516 'status': transform_status,
0517 'retries': 0,
0518 'parent_transform_id': None,
0519 'previous_transform_id': None,
0520 'name': work.get_work_name(),
0521 'new_poll_period': self.new_poll_period,
0522 'update_poll_period': self.update_poll_period,
0523 'max_new_retries': max_new_retries,
0524 'max_update_retries': max_update_retries,
0525
0526 'expired_at': None,
0527 'internal_id': work.internal_id,
0528 'parent_internal_id': None if not work.parent_internal_ids else ",".join(work.parent_internal_ids),
0529 'has_previous_conditions': has_previous_conditions,
0530 'triggered_conditions': triggered_conditions,
0531 'untriggered_conditions': untriggered_conditions,
0532 'loop_index': loop_index,
0533 'site': site,
0534 'transform_metadata': {'internal_id': work.get_internal_id(),
0535 'template_work_id': work.get_template_work_id(),
0536 'sequence_id': work.get_sequence_id(),
0537 'work_name': work.get_work_name(),
0538 'work': work,
0539 'workflow': wf}
0540
0541
0542 }
0543
0544 return new_transform
0545
0546 def generate_condition(self, req, cond):
0547 previous_works = cond.previous_works
0548 following_works = cond.following_works
0549 previous_transforms, following_transforms = [], []
0550 previous_transforms = previous_works
0551 following_transforms = following_works
0552
0553 new_condition = {'request_id': req['request_id'],
0554 'internal_id': cond.internal_id,
0555 'status': ConditionStatus.WaitForTrigger,
0556 'substatus': None,
0557 'is_loop': False,
0558 'loop_index': None,
0559 'cloned_from': None,
0560 'evaluate_result': None,
0561 'previous_transforms': previous_transforms,
0562 'following_transforms': following_transforms,
0563 'condition': {'condition': cond}}
0564 return new_condition
0565
0566 def get_num_active_requests(self, site_name):
0567 cache = get_redis_cache()
0568 num_requests = cache.get("num_requests", default=None)
0569 if num_requests is None:
0570 num_requests = {}
0571 active_status = [RequestStatus.New, RequestStatus.Ready, RequestStatus.Throttling]
0572 active_status1 = [RequestStatus.Transforming, RequestStatus.Terminating]
0573 rets = core_requests.get_num_active_requests(active_status + active_status1)
0574 for ret in rets:
0575 status, site, count = ret
0576 if site is None:
0577 site = 'Default'
0578 if site not in num_requests:
0579 num_requests[site] = {'new': 0, 'processing': 0}
0580 if status in active_status:
0581 num_requests[site]['new'] += count
0582 elif status in active_status1:
0583 num_requests[site]['processing'] += count
0584 cache.set("num_requests", num_requests, expire_seconds=self.cache_expire_seconds)
0585 default_value = {'new': 0, 'processing': 0}
0586 return num_requests.get(site_name, default_value)
0587
0588 def get_num_active_transforms(self, site_name):
0589 cache = get_redis_cache()
0590 num_transforms = cache.get("num_transforms", default=None)
0591 if num_transforms is None:
0592 num_transforms = {}
0593 active_status = [TransformStatus.New, TransformStatus.Ready]
0594 active_status1 = [TransformStatus.Transforming, TransformStatus.Terminating]
0595 rets = core_transforms.get_num_active_transforms(active_status + active_status1)
0596 for ret in rets:
0597 status, site, count = ret
0598 if site is None:
0599 site = 'Default'
0600 if site not in num_transforms:
0601 num_transforms[site] = {'new': 0, 'processing': 0}
0602 if status in active_status:
0603 num_transforms[site]['new'] += count
0604 elif status in active_status1:
0605 num_transforms[site]['processing'] += count
0606 cache.set("num_transforms", num_transforms, expire_seconds=self.cache_expire_seconds)
0607 default_value = {'new': 0, 'processing': 0}
0608 return num_transforms.get(site_name, default_value)
0609
0610 def get_num_active_processings(self, site_name):
0611 cache = get_redis_cache()
0612 num_processings = cache.get("num_processings", default=None)
0613 active_transforms = cache.get("active_transforms", default={})
0614 if num_processings is None:
0615 num_processings = {}
0616 active_transforms = {}
0617 active_status = [ProcessingStatus.New]
0618 active_status1 = [ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0619 ProcessingStatus.Running, ProcessingStatus.Terminating, ProcessingStatus.ToTrigger,
0620 ProcessingStatus.Triggering]
0621 rets = core_processings.get_active_processings(active_status + active_status1)
0622 for ret in rets:
0623 req_id, trf_id, pr_id, site, status = ret
0624 if site is None:
0625 site = 'Default'
0626 if site not in num_processings:
0627 num_processings[site] = {'new': 0, 'processing': 0}
0628 active_transforms[site] = []
0629 if status in active_status:
0630 num_processings[site]['new'] += 1
0631 elif status in active_status1:
0632 num_processings[site]['processing'] += 1
0633 active_transforms[site].append(trf_id)
0634 cache.set("num_processings", num_processings, expire_seconds=self.cache_expire_seconds)
0635 cache.set("active_transforms", active_transforms, expire_seconds=self.cache_expire_seconds)
0636 default_value = {'new': 0, 'processing': 0}
0637 return num_processings.get(site_name, default_value), active_transforms
0638
0639 def get_num_active_contents(self, site_name, active_transform_ids):
0640 cache = get_redis_cache()
0641
0642
0643 tf_id_site_map = {}
0644 all_tf_ids = []
0645 for site in active_transform_ids:
0646 all_tf_ids += active_transform_ids[site]
0647 for tf_id in active_transform_ids[site]:
0648 tf_id_site_map[tf_id] = site
0649
0650 num_input_contents = cache.get("num_input_contents", default=None)
0651 num_output_contents = cache.get("num_output_contents", default=None)
0652 if num_input_contents is None or num_output_contents is None:
0653 num_input_contents, num_output_contents = {}, {}
0654 if all_tf_ids:
0655 ret = core_catalog.get_content_status_statistics_by_relation_type(all_tf_ids)
0656 for item in ret:
0657 status, relation_type, transform_id, count = item
0658 site = tf_id_site_map[transform_id]
0659 if site not in num_input_contents:
0660 num_input_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0661 num_output_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0662 if status in [ContentStatus.New]:
0663 if relation_type == ContentRelationType.Input:
0664 num_input_contents[site]['new'] += count
0665 elif relation_type == ContentRelationType.Output:
0666 num_output_contents[site]['new'] += count
0667 if status in [ContentStatus.Activated]:
0668 if relation_type == ContentRelationType.Input:
0669 num_input_contents[site]['activated'] += count
0670 elif relation_type == ContentRelationType.Output:
0671 num_output_contents[site]['activated'] += count
0672 else:
0673 if relation_type == ContentRelationType.Input:
0674 num_input_contents[site]['processed'] += count
0675 elif relation_type == ContentRelationType.Output:
0676 num_output_contents[site]['processed'] += count
0677
0678 cache.set("num_input_contents", num_input_contents, expire_seconds=self.cache_expire_seconds)
0679 cache.set("num_output_contents", num_output_contents, expire_seconds=self.cache_expire_seconds)
0680 default_value = {'new': 0, 'activated': 0, 'processed': 0}
0681 return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value)
0682
0683 def get_throttlers(self):
0684 cache = get_redis_cache()
0685 throttlers = cache.get("throttlers", default=None)
0686 if throttlers is None:
0687 throttler_items = core_throttlers.get_throttlers()
0688 throttlers = {}
0689 for item in throttler_items:
0690 throttlers[item['site']] = {'num_requests': item['num_requests'],
0691 'num_transforms': item['num_transforms'],
0692 'num_processings': item['num_processings'],
0693 'new_contents': item['new_contents'],
0694 'queue_contents': item['queue_contents'],
0695 'others': item['others'],
0696 'status': item['status']}
0697 cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds)
0698 return throttlers
0699
0700 def whether_to_throttle(self, request):
0701
0702 return False
0703
0704 try:
0705 site = request['site']
0706 if site is None:
0707 site = 'Default'
0708 throttlers = self.get_throttlers()
0709 num_requests = self.get_num_active_requests(site)
0710 num_transforms = self.get_num_active_transforms(site)
0711 num_processings, active_transforms = self.get_num_active_processings(site)
0712 num_input_contents, num_output_contents = self.get_num_active_contents(site, active_transforms)
0713 self.logger.info("throttler(site: %s): active requests(%s), transforms(%s), processings(%s)" % (site, num_requests, num_transforms, num_processings))
0714 self.logger.info("throttler(site: %s): active input contents(%s), output contents(%s)" % (site, num_input_contents, num_output_contents))
0715
0716 throttle_requests = throttlers.get(site, {}).get('num_requests', None)
0717 throttle_transforms = throttlers.get(site, {}).get('num_transforms', None)
0718 throttle_processings = throttlers.get(site, {}).get('num_processings', None)
0719 throttle_new_jobs = throttlers.get(site, {}).get('new_contents', None)
0720 throttle_queue_jobs = throttlers.get(site, {}).get('queue_contents', None)
0721 self.logger.info("throttler(site: %s): throttle_requests %s, throttle_transforms: %s, throttle_processings: %s" % (site, throttle_requests, throttle_transforms, throttle_processings))
0722 if throttle_requests:
0723 if num_requests['processing'] >= throttle_requests:
0724 self.logger.info("throttler(site: %s): num of processing requests (%s) is bigger than throttle_requests (%s), set throttling" % (site, num_requests['processing'], throttle_requests))
0725 return True
0726 if throttle_transforms:
0727 if num_transforms['processing'] >= throttle_transforms:
0728 self.logger.info("throttler(site: %s): num of processing transforms (%s) is bigger than throttle_transforms (%s), set throttling" % (site, num_transforms['processing'], throttle_transforms))
0729 return True
0730 if throttle_processings:
0731 if num_processings['processing'] >= throttle_processings:
0732 self.logger.info("throttler(site: %s): num of processing processings (%s) is bigger than throttle_processings (%s), set throttling" % (site, num_processings['processing'], throttle_processings))
0733 return True
0734
0735 new_jobs = num_input_contents['new']
0736 released_jobs = num_input_contents['processed']
0737 terminated_jobs = num_output_contents['processed']
0738 queue_jobs = released_jobs - terminated_jobs
0739
0740 self.logger.info("throttler(site: %s): throttle_new_jobs: %s, throttle_queue_jobs: %s" % (site, throttle_new_jobs, throttle_queue_jobs))
0741 self.logger.info("throttler(site: %s): new_jobs: %s, queue_jobs: %s" % (site, new_jobs, queue_jobs))
0742 if throttle_new_jobs:
0743 if new_jobs >= throttle_new_jobs:
0744 self.logger.info("throttler(site: %s): num of new jobs(not released) (%s) is bigger than throttle_new_jobs (%s), set throttling" % (site, new_jobs, throttle_new_jobs))
0745 return True
0746 if throttle_queue_jobs:
0747 if queue_jobs >= throttle_queue_jobs:
0748 self.logger.info("throttler(site: %s): num of queue jobs(released but not terminated) (%s) is bigger than throttle_queue_jobs (%s), set throttling" % (site, queue_jobs, throttle_queue_jobs))
0749 return True
0750
0751 return False
0752 except Exception as ex:
0753 self.logger.error("whether_to_throttle: %s" % str(ex))
0754 self.logger.error(traceback.format_exc())
0755 return False
0756
0757 def get_log_prefix(self, req):
0758 return "<request_id=%s>" % req['request_id']
0759
0760 def handle_new_request(self, req):
0761 try:
0762 log_pre = self.get_log_prefix(req)
0763 self.logger.info(log_pre + "Handle new request")
0764 to_throttle = self.whether_to_throttle(req)
0765 if to_throttle:
0766 ret_req = {'request_id': req['request_id'],
0767 'parameters': {'status': RequestStatus.Throttling,
0768 'locking': RequestLocking.Idle}}
0769 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True)
0770 self.logger.info(log_pre + "Throttle new request result: %s" % str(ret_req))
0771 else:
0772 workflow = req['request_metadata']['workflow']
0773
0774
0775 wf = workflow
0776 works = wf.get_new_works()
0777 transforms = []
0778 for work in works:
0779
0780 new_work = work
0781 new_work.add_proxy(wf.get_proxy())
0782
0783
0784
0785 transform = self.generate_transform(req, work)
0786 if transform:
0787 transforms.append(transform)
0788 self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'],
0789 str(transforms)))
0790
0791
0792
0793 ret_req = {'request_id': req['request_id'],
0794 'parameters': {'status': RequestStatus.Transforming,
0795 'locking': RequestLocking.Idle,
0796
0797 'request_metadata': req['request_metadata']},
0798 'new_transforms': transforms}
0799 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0800 self.logger.info(log_pre + "Handle new request result: %s" % str(ret_req))
0801 except Exception as ex:
0802 self.logger.error(ex)
0803 self.logger.error(traceback.format_exc())
0804 retries = req['new_retries'] + 1
0805 if not req['max_new_retries'] or retries < req['max_new_retries']:
0806 req_status = req['status']
0807 else:
0808 req_status = RequestStatus.Failed
0809
0810
0811 new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0812 if new_poll_period > self.max_new_poll_period:
0813 new_poll_period = self.max_new_poll_period
0814
0815 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0816
0817 ret_req = {'request_id': req['request_id'],
0818 'parameters': {'status': req_status,
0819 'locking': RequestLocking.Idle,
0820 'new_retries': retries,
0821 'new_poll_period': new_poll_period,
0822 'errors': req['errors'] if req['errors'] else {}}}
0823 ret_req['parameters']['errors'].update(error)
0824 self.logger.warn(log_pre + "Handle new request error result: %s" % str(ret_req))
0825 return ret_req
0826
0827 def handle_new_irequest(self, req):
0828 try:
0829 log_pre = self.get_log_prefix(req)
0830 self.logger.info(log_pre + "Handle new irequest")
0831 to_throttle = self.whether_to_throttle(req)
0832 if to_throttle:
0833 ret_req = {'request_id': req['request_id'],
0834 'parameters': {'status': RequestStatus.Throttling,
0835 'locking': RequestLocking.Idle}}
0836 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True)
0837 self.logger.info(log_pre + "Throttle new irequest result: %s" % str(ret_req))
0838 else:
0839 workflow = req['request_metadata']['workflow']
0840
0841 transforms = []
0842 transform = self.generate_transform(req, workflow)
0843 if transform:
0844 transforms.append(transform)
0845 self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'],
0846 str(transforms)))
0847 ret_req = {'request_id': req['request_id'],
0848 'parameters': {'status': RequestStatus.Transforming,
0849 'locking': RequestLocking.Idle},
0850 'new_transforms': transforms}
0851 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0852 self.logger.info(log_pre + "Handle new irequest result: %s" % str(ret_req))
0853 except Exception as ex:
0854 self.logger.error(ex)
0855 self.logger.error(traceback.format_exc())
0856 retries = req['new_retries'] + 1
0857 if not req['max_new_retries'] or retries < req['max_new_retries']:
0858 req_status = req['status']
0859 else:
0860 req_status = RequestStatus.Failed
0861
0862
0863 new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0864 if new_poll_period > self.max_new_poll_period:
0865 new_poll_period = self.max_new_poll_period
0866
0867 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0868
0869 ret_req = {'request_id': req['request_id'],
0870 'parameters': {'status': req_status,
0871 'locking': RequestLocking.Idle,
0872 'new_retries': retries,
0873 'new_poll_period': new_poll_period,
0874 'errors': req['errors'] if req['errors'] else {}}}
0875 ret_req['parameters']['errors'].update(error)
0876 self.logger.warn(log_pre + "Handle new irequest error result: %s" % str(ret_req))
0877 return ret_req
0878
0879 def handle_new_generic_request(self, req):
0880 try:
0881 log_pre = self.get_log_prefix(req)
0882 self.logger.info(log_pre + "Handle new generic request")
0883 to_throttle = self.whether_to_throttle(req)
0884 if to_throttle:
0885 ret_req = {'request_id': req['request_id'],
0886 'parameters': {'status': RequestStatus.Throttling,
0887 'locking': RequestLocking.Idle}}
0888 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True)
0889 self.logger.info(log_pre + "Throttle new generic request result: %s" % str(ret_req))
0890 else:
0891 workflow = req['request_metadata']['workflow']
0892
0893 transforms = []
0894 works = workflow.get_works()
0895 for w in works:
0896
0897
0898 transform = self.generate_transform(req, w)
0899 if transform:
0900 transforms.append(transform)
0901 self.logger.debug(log_pre + f"Processing request({req['request_id']}): new transforms: {transforms}")
0902
0903 conds = workflow.get_conditions()
0904 conditions = []
0905 for cond in conds:
0906 condition = self.generate_condition(req, cond)
0907 if condition:
0908 conditions.append(condition)
0909 self.logger.debug(log_pre + f"Processing request({req['request_id']}), new conditions: {conditions}")
0910
0911 ret_req = {'request_id': req['request_id'],
0912 'parameters': {'status': RequestStatus.Transforming,
0913 'locking': RequestLocking.Idle},
0914 'new_transforms': transforms,
0915 'new_conditions': conditions}
0916 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0917 self.logger.info(log_pre + "Handle new generic request result: %s" % str(ret_req))
0918 except Exception as ex:
0919 self.logger.error(ex)
0920 self.logger.error(traceback.format_exc())
0921 retries = req['new_retries'] + 1
0922 if not req['max_new_retries'] or retries < req['max_new_retries']:
0923 req_status = req['status']
0924 else:
0925 req_status = RequestStatus.Failed
0926
0927
0928 new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0929 if new_poll_period > self.max_new_poll_period:
0930 new_poll_period = self.max_new_poll_period
0931
0932 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0933
0934 ret_req = {'request_id': req['request_id'],
0935 'parameters': {'status': req_status,
0936 'locking': RequestLocking.Idle,
0937 'new_retries': retries,
0938 'new_poll_period': new_poll_period,
0939 'errors': req['errors'] if req['errors'] else {}}}
0940 ret_req['parameters']['errors'].update(error)
0941 self.logger.warn(log_pre + "Handle new irequest error result: %s" % str(ret_req))
0942 return ret_req
0943
0944 def has_to_build_work(self, req):
0945 try:
0946 if req['status'] in [RequestStatus.New] and 'build_workflow' in req['request_metadata']:
0947 log_pre = self.get_log_prefix(req)
0948 self.logger.info(log_pre + "has build work")
0949 return True
0950
0951
0952
0953
0954
0955 except Exception as ex:
0956 self.logger.error(ex)
0957 self.logger.error(traceback.format_exc())
0958 return False
0959
0960 def handle_build_request(self, req):
0961 try:
0962 log_pre = self.get_log_prefix(req)
0963 self.logger.info(log_pre + "handle build request")
0964
0965 workflow = req['request_metadata']['build_workflow']
0966 works = workflow.get_new_works()
0967 transforms = []
0968 for work in works:
0969 new_work = work
0970 new_work.add_proxy(workflow.get_proxy())
0971 transform = self.generate_transform(req, new_work, build=True)
0972 transforms.append(transform)
0973 self.logger.debug(log_pre + "Processing request(%s): new build transforms: %s" % (req['request_id'],
0974 str(transforms)))
0975
0976 ret_req = {'request_id': req['request_id'],
0977 'parameters': {'status': RequestStatus.Building,
0978 'locking': RequestLocking.Idle,
0979
0980 'request_metadata': req['request_metadata']},
0981 'new_transforms': transforms}
0982 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0983 self.logger.info(log_pre + "Handle build request result: %s" % str(ret_req))
0984 except Exception as ex:
0985 self.logger.error(ex)
0986 self.logger.error(traceback.format_exc())
0987 retries = req['new_retries'] + 1
0988 if not req['max_new_retries'] or retries < req['max_new_retries']:
0989 req_status = req['status']
0990 else:
0991 req_status = RequestStatus.Failed
0992
0993
0994 new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0995 if new_poll_period > self.max_new_poll_period:
0996 new_poll_period = self.max_new_poll_period
0997
0998 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0999
1000 ret_req = {'request_id': req['request_id'],
1001 'parameters': {'status': req_status,
1002 'locking': RequestLocking.Idle,
1003 'new_retries': retries,
1004 'new_poll_period': new_poll_period,
1005 'errors': req['errors'] if req['errors'] else {}}}
1006 ret_req['parameters']['errors'].update(error)
1007 self.logger.warn(log_pre + "Handle build request error result: %s" % str(ret_req))
1008 return ret_req
1009
1010 def update_request(self, req, origin_req=None):
1011 new_tf_ids, update_tf_ids = [], []
1012 try:
1013 log_pre = self.get_log_prefix(req)
1014 self.logger.info(log_pre + "update request: %s" % req)
1015 req['parameters']['locking'] = RequestLocking.Idle
1016 req['parameters']['updated_at'] = datetime.datetime.utcnow()
1017
1018 if 'new_transforms' in req:
1019 new_transforms = req['new_transforms']
1020 else:
1021 new_transforms = []
1022
1023 if 'update_transforms' in req:
1024 update_transforms = req['update_transforms']
1025 else:
1026 update_transforms = {}
1027
1028 if 'new_conditions' in req:
1029 new_conditions = req['new_conditions']
1030 else:
1031 new_conditions = []
1032
1033 if origin_req:
1034 origin_status = origin_req['status']
1035 else:
1036 origin_status = None
1037
1038 retry = True
1039 retry_num = 0
1040 while retry:
1041 retry = False
1042 retry_num += 1
1043 try:
1044 _, new_tf_ids, update_tf_ids = core_requests.update_request_with_transforms(req['request_id'], req['parameters'],
1045 origin_status=origin_status,
1046 new_transforms=new_transforms,
1047 update_transforms=update_transforms,
1048 new_conditions=new_conditions)
1049 except exceptions.DatabaseException as ex:
1050 if 'ORA-00060' in str(ex):
1051 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
1052 if retry_num < 5:
1053 retry = True
1054 if retry_num <= 1:
1055 random_sleep = random.randint(1, 10)
1056 elif retry_num <= 2:
1057 random_sleep = random.randint(1, 60)
1058 else:
1059 random_sleep = random.randint(1, 120)
1060 time.sleep(random_sleep)
1061 else:
1062 raise ex
1063 else:
1064
1065
1066 raise ex
1067 except Exception as ex:
1068 self.logger.error(ex)
1069 self.logger.error(traceback.format_exc())
1070 try:
1071 req_parameters = {'status': RequestStatus.Transforming,
1072 'locking': RequestLocking.Idle}
1073 if 'new_retries' in req['parameters']:
1074 req_parameters['new_retries'] = req['parameters']['new_retries']
1075 if 'update_retries' in req['parameters']:
1076 req_parameters['update_retries'] = req['parameters']['update_retries']
1077 if 'errors' in req['parameters']:
1078 req_parameters['errors'] = req['parameters']['errors']
1079
1080 if origin_req:
1081 origin_status = origin_req['status']
1082 else:
1083 origin_status = None
1084
1085 self.logger.warn(log_pre + "Update request in exception: %s" % str(req_parameters))
1086 core_requests.update_request_with_transforms(req['request_id'], req_parameters, origin_status=origin_status)
1087 except Exception as ex:
1088 self.logger.error(ex)
1089 self.logger.error(traceback.format_exc())
1090 return new_tf_ids, update_tf_ids
1091
1092 def process_new_request(self, event=None, request=None):
1093 self.number_workers += 1
1094 try:
1095 if request is None and event:
1096
1097 req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling]
1098 req = self.get_request(request_id=event._request_id, status=req_status, locking=True)
1099 if not req:
1100 self.logger.error("Cannot find request for event: %s" % str(event))
1101 request = req
1102
1103 if request:
1104 req = request
1105 log_pre = self.get_log_prefix(req)
1106 self.logger.info(f"{log_pre} process_new_request request: {req['request_id']} event: {event}")
1107 if self.has_to_build_work(req):
1108 ret = self.handle_build_request(req)
1109 elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1110 ret = self.handle_new_irequest(req)
1111 elif req['request_type'] in [RequestType.GenericWorkflow]:
1112 ret = self.handle_new_generic_request(req)
1113 else:
1114 ret = self.handle_new_request(req)
1115 new_tf_ids, update_tf_ids = self.update_request(ret, origin_req=req)
1116 for tf_id in new_tf_ids:
1117
1118
1119 self.logger.info(log_pre + "QueueTransformEvent(transform_id: %s)" % str(tf_id))
1120 event = QueueTransformEvent(publisher_id=self.id, transform_id=tf_id)
1121 self.event_bus.send(event)
1122
1123 for tf_id in update_tf_ids:
1124 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % str(tf_id))
1125 event = UpdateTransformEvent(publisher_id=self.id, transform_id=tf_id)
1126 self.event_bus.send(event)
1127 except Exception as ex:
1128 self.logger.error(ex)
1129 self.logger.error(traceback.format_exc())
1130 self.number_workers -= 1
1131
1132 def get_workflow_status(self, wf, tf_statuses, has_new_transforms, to_abort):
1133 if has_new_transforms:
1134 return RequestStatus.Transforming
1135 terminated_status = [
1136 TransformStatus.Finished,
1137 TransformStatus.SubFinished,
1138 TransformStatus.Failed,
1139 TransformStatus.Cancelled,
1140 TransformStatus.Suspended,
1141 TransformStatus.Expired,
1142 TransformStatus.Built
1143 ]
1144 finished_status = [TransformStatus.Finished]
1145 failed_status = [
1146 TransformStatus.Failed,
1147 TransformStatus.Cancelled,
1148 TransformStatus.Suspended,
1149 TransformStatus.Expired,
1150 ]
1151
1152 all_terminated = all(status in terminated_status for status in tf_statuses)
1153 all_finished = all(status in finished_status for status in tf_statuses)
1154 all_failed = all(status in failed_status for status in tf_statuses)
1155
1156 if all_finished:
1157 return RequestStatus.Finished
1158 elif all_failed:
1159 return RequestStatus.Failed
1160 elif all_terminated:
1161 if to_abort:
1162 return RequestStatus.Cancelled
1163 return RequestStatus.SubFinished
1164 return RequestStatus.Transforming
1165
1166 def handle_update_request_real(self, req, event):
1167 """
1168 process running request
1169 """
1170 log_pre = self.get_log_prefix(req)
1171 self.logger.info(log_pre + " handle_update_request: request_id: %s" % req['request_id'])
1172 if 'workflow' in req['request_metadata']:
1173 wf = req['request_metadata']['workflow']
1174 else:
1175 wf = req['request_metadata']['build_workflow']
1176
1177 to_abort = False
1178 to_abort_transform_id = None
1179 if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
1180 and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):
1181 to_abort = True
1182 self.logger.info(log_pre + "to_abort: %s" % to_abort)
1183 if (event and event._content and 'cmd_content' in event._content and event._content['cmd_content']
1184 and 'transform_id' in event._content['cmd_content']):
1185 to_abort_transform_id = event._content['cmd_content']['transform_id']
1186 self.logger.info(log_pre + "to_abort_transform_id: %s" % to_abort_transform_id)
1187
1188 if to_abort and not to_abort_transform_id:
1189 wf.to_cancel = True
1190
1191
1192 works = wf.get_all_works()
1193
1194 all_released_work_status = []
1195 for work in works:
1196
1197 found_match_works = False
1198 if work.get_work_id():
1199 tf = core_transforms.get_transform(transform_id=work.get_work_id(), request_id=req['request_id'])
1200 if tf:
1201 all_released_work_status.append(tf['status'])
1202 transform_work = tf['transform_metadata']['work']
1203
1204
1205 work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1206 self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1207 found_match_works = True
1208 if not found_match_works:
1209 tfs = core_transforms.get_transforms(request_id=req['request_id'], internal_ids=work.internal_id, loop_index=work.get_loop_index())
1210 if not tfs:
1211 self.logger.info(f"{log_pre} Found transforms with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tfs}")
1212 else:
1213 tf_ids = [tf['transform_id'] for tf in tfs]
1214 self.logger.info(f"{log_pre} Found transforms with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tf_ids}")
1215 if len(tfs) == 1:
1216 tf = tfs[0]
1217 all_released_work_status.append(tf['status'])
1218 transform_work = tf['transform_metadata']['work']
1219 if transform_work.internal_id == work.internal_id:
1220 if hasattr(work, 'set_work_id'):
1221 work.set_work_id(tf['transform_id'], transforming=True)
1222 work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1223 self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1224 else:
1225 all_released_work_status.append(None)
1226 else:
1227 all_released_work_status.append(None)
1228
1229 wf.refresh_works(clean=True)
1230
1231 new_transforms = []
1232 self.logger.info(log_pre + f"request status: {req['status']} and to_cancel: {wf.to_cancel}")
1233
1234
1235
1236 has_new_transforms = False
1237 if True:
1238
1239 works = wf.get_new_works()
1240 for work in works:
1241 tfs = core_transforms.get_transforms(request_id=req['request_id'], internal_ids=work.internal_id, loop_index=work.get_loop_index())
1242 if not tfs:
1243 self.logger.info(f"{log_pre} Found transforms for new work with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tfs}")
1244 else:
1245 tf_ids = [tf['transform_id'] for tf in tfs]
1246 self.logger.info(f"{log_pre} Found transforms for new work with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tf_ids}")
1247 if len(tfs) == 1:
1248 tf = tfs[0]
1249 transform_work = tf['transform_metadata']['work']
1250 if transform_work.internal_id == work.internal_id:
1251 if hasattr(work, 'set_work_id'):
1252 work.set_work_id(tf['transform_id'], transforming=True)
1253 work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1254 self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1255 else:
1256 has_new_transforms = True
1257
1258 new_work = work
1259 new_work.add_proxy(wf.get_proxy())
1260 new_transform = self.generate_transform(req, new_work)
1261 new_transforms.append(new_transform)
1262 self.logger.debug(log_pre + " Processing request(%s): new transforms: %s" % (req['request_id'], str(new_transforms)))
1263
1264 req_status = self.get_workflow_status(wf, all_released_work_status, has_new_transforms, to_abort)
1265 """
1266 if wf.is_terminated():
1267 if wf.is_finished(synchronize=False):
1268 req_status = RequestStatus.Finished
1269 else:
1270 if to_abort and not to_abort_transform_id:
1271 req_status = RequestStatus.Cancelled
1272 elif wf.is_expired(synchronize=False):
1273 req_status = RequestStatus.Expired
1274 elif wf.is_subfinished(synchronize=False):
1275 req_status = RequestStatus.SubFinished
1276 elif wf.is_failed(synchronize=False):
1277 req_status = RequestStatus.Failed
1278 else:
1279 req_status = RequestStatus.Failed
1280
1281 # req_msg = wf.get_terminated_msg()
1282 """
1283 if req_status in [RequestStatus.Transforming]:
1284 if wf.is_to_expire(req['expired_at'], self.pending_time, request_id=req['request_id']):
1285 wf.expired = True
1286 event_content = {'request_id': req['request_id'],
1287 'cmd_type': CommandType.ExpireRequest,
1288 'cmd_content': {}}
1289 self.logger.debug(log_pre + "ExpireRequestEvent(request_id: %s)" % req['request_id'])
1290 event = ExpireRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event_content)
1291 self.event_bus.send(event)
1292
1293 parameters = {'status': req_status,
1294 'locking': RequestLocking.Idle,
1295 'request_metadata': req['request_metadata']
1296 }
1297 parameters = self.load_poll_period(req, parameters)
1298
1299 ret = {'request_id': req['request_id'],
1300 'parameters': parameters,
1301 'new_transforms': new_transforms}
1302 self.logger.info(log_pre + "Handle update request result: %s" % str(ret))
1303 return ret
1304
1305 def handle_update_build_request_real(self, req, event):
1306 """
1307 process build request
1308 """
1309 log_pre = self.get_log_prefix(req)
1310 self.logger.info(log_pre + " handle_update_build_request: request_id: %s" % req['request_id'])
1311 wf = req['request_metadata']['build_workflow']
1312
1313 to_abort = False
1314 to_abort_transform_id = None
1315 if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
1316 and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):
1317 to_abort = True
1318 self.logger.info(log_pre + "to_abort: %s" % to_abort)
1319 if (event and event._content and 'cmd_content' in event._content and event._content['cmd_content']
1320 and 'transform_id' in event._content['cmd_content']):
1321 to_abort_transform_id = event._content['cmd_content']['transform_id']
1322 self.logger.info(log_pre + "to_abort_transform_id: %s" % to_abort_transform_id)
1323
1324 if to_abort and not to_abort_transform_id:
1325 wf.to_cancel = True
1326
1327
1328 works = wf.get_all_works()
1329
1330 finished_build_transforms = []
1331 for work in works:
1332
1333 tf = core_transforms.get_transform(transform_id=work.get_work_id())
1334 if tf:
1335 transform_work = tf['transform_metadata']['work']
1336
1337
1338 work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1339 self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1340 if tf['status'] in [TransformStatus.Finished]:
1341 finished_build_transforms.append(tf['transform_id'])
1342 wf.refresh_works()
1343
1344 new_transforms = []
1345 if req['status'] in [RequestStatus.Building] and not wf.to_cancel:
1346
1347 works = wf.get_new_works()
1348 for work in works:
1349
1350 new_work = work
1351 new_work.add_proxy(wf.get_proxy())
1352 new_transform = self.generate_transform(req, new_work, build=True)
1353 new_transforms.append(new_transform)
1354 self.logger.debug(log_pre + " Processing build request(%s): new transforms: %s" % (req['request_id'], str(new_transforms)))
1355
1356 req_status = RequestStatus.Building
1357 if wf.is_terminated():
1358 if wf.is_finished(synchronize=False):
1359 if finished_build_transforms:
1360 req_status = RequestStatus.Built
1361 else:
1362 req_status = RequestStatus.Failed
1363 else:
1364 if to_abort and not to_abort_transform_id:
1365 req_status = RequestStatus.Cancelled
1366 elif wf.is_expired(synchronize=False):
1367 req_status = RequestStatus.Expired
1368 elif wf.is_subfinished(synchronize=False):
1369 req_status = RequestStatus.SubFinished
1370 elif wf.is_failed(synchronize=False):
1371 req_status = RequestStatus.Failed
1372 else:
1373 req_status = RequestStatus.Failed
1374
1375 else:
1376 if wf.is_to_expire(req['expired_at'], self.pending_time, request_id=req['request_id']):
1377 wf.expired = True
1378 event_content = {'request_id': req['request_id'],
1379 'cmd_type': CommandType.ExpireRequest,
1380 'cmd_content': {}}
1381 self.logger.debug(log_pre + "ExpireRequestEvent(request_id: %s)" % req['request_id'])
1382 event = ExpireRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event_content)
1383 self.event_bus.send(event)
1384
1385 parameters = {'status': req_status,
1386 'locking': RequestLocking.Idle,
1387 'request_metadata': req['request_metadata']
1388 }
1389 parameters = self.load_poll_period(req, parameters)
1390
1391 ret = {'request_id': req['request_id'],
1392 'parameters': parameters,
1393 'new_transforms': new_transforms}
1394 self.logger.info(log_pre + "Handle update request result: %s" % str(ret))
1395 return ret
1396
1397 def handle_update_request(self, req, event):
1398 """
1399 process running request
1400 """
1401 try:
1402
1403
1404 if req['status'] in [RequestStatus.Building]:
1405 ret_req = self.handle_update_build_request_real(req, event=event)
1406 else:
1407 ret_req = self.handle_update_request_real(req, event)
1408 except Exception as ex:
1409 self.logger.error(ex)
1410 self.logger.error(traceback.format_exc())
1411 retries = req['update_retries'] + 1
1412 if not req['max_update_retries'] or retries < req['max_update_retries']:
1413 req_status = req['status']
1414 else:
1415 req_status = RequestStatus.Failed
1416 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1417
1418
1419 update_poll_period = int(req['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1420 if update_poll_period > self.max_update_poll_period:
1421 update_poll_period = self.max_update_poll_period
1422
1423 ret_req = {'request_id': req['request_id'],
1424 'parameters': {'status': req_status,
1425 'locking': RequestLocking.Idle,
1426 'update_retries': retries,
1427 'update_poll_period': update_poll_period,
1428 'errors': req['errors'] if req['errors'] else {}}}
1429 ret_req['parameters']['errors'].update(error)
1430 log_pre = self.get_log_prefix(req)
1431 self.logger.warn(log_pre + "Handle update request exception result: %s" % str(ret_req))
1432 return ret_req
1433
1434 def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
1435 if expired_at:
1436 if type(expired_at) in [str]:
1437 expired_at = str_to_date(expired_at)
1438 if expired_at < datetime.datetime.utcnow():
1439 self.logger.info("Request(%s) expired_at(%s) is smaller than utc now(%s), expiring" % (request_id,
1440 expired_at,
1441 datetime.datetime.utcnow()))
1442 return True
1443 return False
1444
1445 def handle_update_irequest_real(self, req, event):
1446 """
1447 process running request
1448 """
1449 log_pre = self.get_log_prefix(req)
1450 self.logger.info(log_pre + " handle_update_irequest: request_id: %s" % req['request_id'])
1451
1452 tfs = core_transforms.get_transforms(request_id=req['request_id'])
1453 total_tfs, finished_tfs, subfinished_tfs, failed_tfs = 0, 0, 0, 0
1454 for tf in tfs:
1455 total_tfs += 1
1456 if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]:
1457 finished_tfs += 1
1458 elif tf['status'] in [TransformStatus.SubFinished]:
1459 subfinished_tfs += 1
1460 elif tf['status'] in [TransformStatus.Failed, TransformStatus.Cancelled,
1461 TransformStatus.Suspended, TransformStatus.Expired]:
1462 failed_tfs += 1
1463
1464 req_status = RequestStatus.Transforming
1465 if req['request_type'] in [RequestType.iWorkflowLocal]:
1466 workflow = req['request_metadata'].get('workflow', None)
1467 if workflow and req['created_at'] + datetime.timedelta(seconds=workflow.max_walltime) < datetime.datetime.utcnow():
1468 req_status = RequestStatus.Finished
1469 else:
1470 if total_tfs == finished_tfs:
1471 req_status = RequestStatus.Finished
1472 elif total_tfs == finished_tfs + subfinished_tfs + failed_tfs:
1473 if finished_tfs + subfinished_tfs > 0:
1474 req_status = RequestStatus.SubFinished
1475 else:
1476 req_status = RequestStatus.Failed
1477
1478 log_msg = log_pre + "ireqeust %s status: %s" % (req['request_id'], req_status)
1479 log_msg = log_msg + "(transforms: total %s, finished: %s, subfinished: %s, failed %s)" % (total_tfs, finished_tfs, subfinished_tfs, failed_tfs)
1480 self.logger.debug(log_msg)
1481
1482 if req_status not in [RequestStatus.Finished, RequestStatus.SubFinished, RequestStatus.Failed]:
1483 if self.is_to_expire(req['expired_at'], self.pending_time, request_id=req['request_id']):
1484 event_content = {'request_id': req['request_id'],
1485 'cmd_type': CommandType.ExpireRequest,
1486 'cmd_content': {}}
1487 self.logger.debug(log_pre + "ExpireRequestEvent(request_id: %s)" % req['request_id'])
1488 event = ExpireRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event_content)
1489 self.event_bus.send(event)
1490
1491 parameters = {'status': req_status,
1492 'locking': RequestLocking.Idle,
1493 'request_metadata': req['request_metadata']
1494 }
1495 parameters = self.load_poll_period(req, parameters)
1496
1497 ret = {'request_id': req['request_id'],
1498 'parameters': parameters}
1499 self.logger.info(log_pre + "Handle update irequest result: %s" % str(ret))
1500 return ret
1501
1502 def handle_update_irequest(self, req, event):
1503 """
1504 process running irequest
1505 """
1506 try:
1507 ret_req = self.handle_update_irequest_real(req, event)
1508 except Exception as ex:
1509 self.logger.error(ex)
1510 self.logger.error(traceback.format_exc())
1511 retries = req['update_retries'] + 1
1512 if not req['max_update_retries'] or retries < req['max_update_retries']:
1513 req_status = req['status']
1514 else:
1515 req_status = RequestStatus.Failed
1516 error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1517
1518
1519 update_poll_period = int(req['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1520 if update_poll_period > self.max_update_poll_period:
1521 update_poll_period = self.max_update_poll_period
1522
1523 ret_req = {'request_id': req['request_id'],
1524 'parameters': {'status': req_status,
1525 'locking': RequestLocking.Idle,
1526 'update_retries': retries,
1527 'update_poll_period': update_poll_period,
1528 'errors': req['errors'] if req['errors'] else {}}}
1529 ret_req['parameters']['errors'].update(error)
1530 log_pre = self.get_log_prefix(req)
1531 self.logger.warn(log_pre + "Handle update irequest exception result: %s" % str(ret_req))
1532 return ret_req
1533
1534 def process_update_request(self, event=None, request=None):
1535 self.number_workers += 1
1536 pro_ret = ReturnCode.Ok.value
1537 try:
1538 if request is None and event:
1539
1540
1541
1542
1543
1544
1545
1546
1547 self.logger.debug("process_update_request: event: %s" % str(event))
1548 req = self.get_request(request_id=event._request_id, locking=True)
1549 if not req:
1550 self.logger.error("Cannot find request for event: %s" % str(event))
1551
1552 pro_ret = ReturnCode.Ok.value
1553 else:
1554 request = req
1555
1556 if request:
1557 req = request
1558 log_pre = self.get_log_prefix(req)
1559 self.logger.info(f"{log_pre} process_update_request request: {req['request_id']} event: {event}")
1560
1561 if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1562 ret = self.handle_update_irequest(req, event=event)
1563 else:
1564 ret = self.handle_update_request(req, event=event)
1565 new_tf_ids, update_tf_ids = self.update_request(ret, origin_req=req)
1566 for tf_id in new_tf_ids:
1567
1568
1569 self.logger.info(log_pre + "QueueTransformEvent(transform_id: %s)" % str(tf_id))
1570 event = QueueTransformEvent(publisher_id=self.id, transform_id=tf_id)
1571 self.event_bus.send(event)
1572 for tf_id in update_tf_ids:
1573 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % tf_id)
1574 event = UpdateTransformEvent(publisher_id=self.id, transform_id=tf_id, content=event._content if event else None)
1575 self.event_bus.send(event)
1576 except Exception as ex:
1577 self.logger.error(ex)
1578 self.logger.error(traceback.format_exc())
1579 pro_ret = ReturnCode.Failed.value
1580 self.number_workers -= 1
1581 return pro_ret
1582
1583 def handle_abort_request(self, req, event):
1584 """
1585 process abort request
1586 """
1587 try:
1588 log_pre = self.get_log_prefix(req)
1589 self.logger.info(log_pre + "handle_abort_request event: %s" % str(event))
1590
1591 to_abort = False
1592 to_abort_transform_id = None
1593 if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
1594 and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):
1595 to_abort = True
1596 self.logger.info(log_pre + "to_abort: %s" % to_abort)
1597 if (event and event._content and 'cmd_content' in event._content and event._content['cmd_content']
1598 and 'transform_id' in event._content['cmd_content']):
1599 to_abort_transform_id = event._content['cmd_content']['transform_id']
1600 self.logger.info(log_pre + "to_abort_transform_id: %s" % to_abort_transform_id)
1601
1602 if to_abort and to_abort_transform_id:
1603 req_status = req['status']
1604 else:
1605 if req['status'] in [RequestStatus.Building]:
1606 wf = req['request_metadata']['build_workflow']
1607 else:
1608 if 'workflow' in req['request_metadata']:
1609 wf = req['request_metadata']['workflow']
1610 else:
1611 wf = req['request_metadata']['build_workflow']
1612 wf.to_cancel = True
1613 req_status = RequestStatus.Cancelling
1614
1615 ret_req = {'request_id': req['request_id'],
1616 'parameters': {'status': req_status,
1617 'substatus': RequestStatus.ToCancel,
1618 'locking': RequestLocking.Idle,
1619 'request_metadata': req['request_metadata']},
1620 }
1621 self.logger.info(log_pre + "handle_abort_request result: %s" % str(ret_req))
1622 return ret_req
1623 except Exception as ex:
1624 self.logger.error(ex)
1625 self.logger.error(traceback.format_exc())
1626 error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1627 ret_req = {'request_id': req['request_id'],
1628 'parameters': {'status': RequestStatus.ToCancel,
1629 'locking': RequestLocking.Idle,
1630 'errors': req['errors'] if req['errors'] else {}}}
1631 ret_req['parameters']['errors'].update(error)
1632 self.logger.info(log_pre + "handle_abort_request exception result: %s" % str(ret_req))
1633 return ret_req
1634
1635 def handle_command(self, event, cmd_status=None, command=None, errors=None):
1636 if (event and event._content and 'cmd_id' in event._content and event._content['cmd_id']):
1637 u_command = {'cmd_id': event._content['cmd_id'],
1638 'status': cmd_status,
1639 'locking': CommandLocking.Idle}
1640 if errors:
1641 u_command['errors'] = errors
1642 core_commands.update_commands([u_command])
1643 if command:
1644 u_command = {'cmd_id': command,
1645 'status': cmd_status,
1646 'locking': CommandLocking.Idle}
1647 if errors:
1648 u_command['errors'] = errors
1649 core_commands.update_commands([u_command])
1650
1651 def process_abort_request(self, event=None, request=None, command=None):
1652 self.number_workers += 1
1653 pro_ret = ReturnCode.Ok.value
1654 try:
1655 if request is None and event:
1656 req = self.get_request(request_id=event._request_id, locking=True)
1657 if not req:
1658 self.logger.warn("Cannot find request for event: %s" % str(event))
1659 pro_ret = ReturnCode.Locked.value
1660 else:
1661 request = req
1662 if request:
1663 req = request
1664 log_pre = self.get_log_prefix(req)
1665 self.logger.info(log_pre + f"process_abort_request request: {req['request_id']}, event: {event}, command: {command}")
1666
1667 if req['status'] in [RequestStatus.Finished, RequestStatus.SubFinished,
1668 RequestStatus.Failed, RequestStatus.Cancelled,
1669 RequestStatus.Suspended, RequestStatus.Expired]:
1670 ret = {'request_id': req['request_id'],
1671 'parameters': {'locking': RequestLocking.Idle,
1672 'command': CommandType.AbortRequest,
1673 'errors': {'extra_msg': "Request is already terminated. Cannot be aborted"}}}
1674 if req['errors'] and 'msg' in req['errors']:
1675 ret['parameters']['errors']['msg'] = req['errors']['msg']
1676 self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1677 self.update_request(ret, origin_req=req)
1678 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors="Request is already terminated. Cannot be aborted")
1679 elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1680 ret = self.handle_close_irequest(req, event=event)
1681 self.update_request(ret, origin_req=req)
1682
1683
1684 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1685 else:
1686 ret = self.handle_abort_request(req, event)
1687 self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1688 self.update_request(ret, origin_req=req)
1689
1690 core_transforms.abort_resume_transforms(request_id=req['request_id'], abort=True)
1691 core_processings.abort_resume_processings(request_id=req['request_id'], abort=True)
1692
1693 to_abort_transform_id = None
1694 if event and event._content and event._content['cmd_content'] and 'transform_id' in event._content['cmd_content']:
1695 to_abort_transform_id = event._content['cmd_content']['transform_id']
1696
1697 if req['status'] in [RequestStatus.Building]:
1698 wf = req['request_metadata']['build_workflow']
1699 else:
1700 if 'workflow' in req['request_metadata']:
1701 wf = req['request_metadata']['workflow']
1702 else:
1703 wf = req['request_metadata']['build_workflow']
1704 works = wf.get_all_works()
1705 if works:
1706 has_abort_work = False
1707 for work in works:
1708 if (work.is_started() or work.is_starting()) and not work.is_terminated():
1709 if not to_abort_transform_id or to_abort_transform_id == work.get_work_id():
1710 self.logger.info(log_pre + "AbortTransformEvent(transform_id: %s)" % str(work.get_work_id()))
1711 event = AbortTransformEvent(publisher_id=self.id,
1712 transform_id=work.get_work_id(),
1713 content=event._content if event else None)
1714 self.event_bus.send(event)
1715 has_abort_work = True
1716 if not has_abort_work:
1717 self.logger.info(log_pre + "not has abort work")
1718 self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1719 event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1720 self.event_bus.send(event)
1721 else:
1722
1723 self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1724 event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1725 self.event_bus.send(event)
1726
1727 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1728 except AssertionError as ex:
1729 self.logger.error("process_abort_request, Failed to process event: %s" % str(event))
1730 self.logger.error(ex)
1731 self.logger.error(traceback.format_exc())
1732 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=str(ex))
1733 pro_ret = ReturnCode.Failed.value
1734 except Exception as ex:
1735 self.logger.error(ex)
1736 self.logger.error(traceback.format_exc())
1737 pro_ret = ReturnCode.Failed.value
1738 self.number_workers -= 1
1739 return pro_ret
1740
1741 def handle_close_irequest(self, req, event):
1742 """
1743 process close irequest
1744 """
1745 try:
1746 log_pre = self.get_log_prefix(req)
1747 self.logger.info(log_pre + "handle_close_irequest event: %s" % str(event))
1748
1749 tfs = core_transforms.get_transforms(request_id=req['request_id'])
1750 total_tfs, finished_tfs, subfinished_tfs, failed_tfs = 0, 0, 0, 0
1751 for tf in tfs:
1752 total_tfs += 1
1753 if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]:
1754 finished_tfs += 1
1755 elif tf['status'] in [TransformStatus.SubFinished]:
1756 subfinished_tfs += 1
1757 elif tf['status'] in [TransformStatus.Failed, TransformStatus.Cancelled,
1758 TransformStatus.Suspended, TransformStatus.Expired]:
1759 failed_tfs += 1
1760 else:
1761 event = AbortTransformEvent(publisher_id=self.id,
1762 transform_id=tf['transform_id'],
1763 content=event._content if event else None)
1764 self.event_bus.send(event)
1765
1766 req_status = RequestStatus.Transforming
1767 if req['request_type'] in [RequestType.iWorkflowLocal] and total_tfs == 0:
1768 req_status = RequestStatus.Finished
1769 else:
1770 if total_tfs == finished_tfs:
1771 req_status = RequestStatus.Finished
1772 elif total_tfs == finished_tfs + subfinished_tfs + failed_tfs:
1773 if finished_tfs + subfinished_tfs > 0:
1774 req_status = RequestStatus.SubFinished
1775 else:
1776 req_status = RequestStatus.Failed
1777
1778 log_msg = log_pre + "ireqeust %s status: %s" % (req['request_id'], req_status)
1779 log_msg = log_msg + "(transforms: total %s, finished: %s, subfinished: %s, failed %s)" % (total_tfs, finished_tfs, subfinished_tfs, failed_tfs)
1780 self.logger.debug(log_msg)
1781
1782 parameters = {'status': req_status,
1783 'substatus': RequestStatus.ToClose,
1784 'locking': RequestLocking.Idle,
1785 'request_metadata': req['request_metadata']
1786 }
1787 parameters = self.load_poll_period(req, parameters)
1788
1789 ret = {'request_id': req['request_id'],
1790 'parameters': parameters}
1791 self.logger.info(log_pre + "Handle close irequest result: %s" % str(ret))
1792 return ret
1793
1794 except Exception as ex:
1795 self.logger.error(ex)
1796 self.logger.error(traceback.format_exc())
1797 error = {'close_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1798 ret_req = {'request_id': req['request_id'],
1799 'parameters': {'status': RequestStatus.ToClose,
1800 'locking': RequestLocking.Idle,
1801 'errors': req['errors'] if req['errors'] else {}}}
1802 ret_req['parameters']['errors'].update(error)
1803 self.logger.info(log_pre + "handle_close_irequest exception result: %s" % str(ret_req))
1804 return ret_req
1805
1806 def process_close_request(self, event=None, request=None, command=None):
1807 self.number_workers += 1
1808 pro_ret = ReturnCode.Ok.value
1809 try:
1810 if request is None and event:
1811 req = self.get_request(request_id=event._request_id, locking=True)
1812 if not req:
1813 self.logger.warn("Cannot find request for event: %s" % str(event))
1814 pro_ret = ReturnCode.Locked.value
1815 else:
1816 request = req
1817 if request:
1818 req = request
1819 log_pre = self.get_log_prefix(req)
1820 self.logger.info(log_pre + f"process_close_request request: {req['request_id']} event: {event}")
1821
1822 if req['status'] in [RequestStatus.Finished, RequestStatus.SubFinished,
1823 RequestStatus.Failed, RequestStatus.Cancelled,
1824 RequestStatus.Suspended, RequestStatus.Expired]:
1825 ret = {'request_id': req['request_id'],
1826 'parameters': {'locking': RequestLocking.Idle,
1827 'command': CommandType.CloseRequest,
1828 'errors': {'extra_msg': "Request is already terminated. Cannot be closed"}}}
1829 if req['errors'] and 'msg' in req['errors']:
1830 ret['parameters']['errors']['msg'] = req['errors']['msg']
1831 self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1832 self.update_request(ret, origin_req=req)
1833 self.handle_command(event, command=command, cmd_status=CommandStatus.Failed, errors="Request is already terminated. Cannot be closed")
1834 else:
1835 if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1836 ret = self.handle_close_irequest(req, event=event)
1837 self.update_request(ret, origin_req=req)
1838 else:
1839 pass
1840 ret = self.handle_abort_request(req, event)
1841 self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1842 self.update_request(ret, origin_req=req)
1843 to_abort_transform_id = None
1844 if event and event._content and event._content['cmd_content'] and 'transform_id' in event._content['cmd_content']:
1845 to_abort_transform_id = event._content['cmd_content']['transform_id']
1846
1847 if req['status'] in [RequestStatus.Building]:
1848 wf = req['request_metadata']['build_workflow']
1849 else:
1850 if 'workflow' in req['request_metadata']:
1851 wf = req['request_metadata']['workflow']
1852 else:
1853 wf = req['request_metadata']['build_workflow']
1854 works = wf.get_all_works()
1855 if works:
1856 has_abort_work = False
1857 for work in works:
1858 if (work.is_started() or work.is_starting()) and not work.is_terminated():
1859 if not to_abort_transform_id or to_abort_transform_id == work.get_work_id():
1860 self.logger.info(log_pre + "AbortTransformEvent(transform_id: %s)" % str(work.get_work_id()))
1861 event = AbortTransformEvent(publisher_id=self.id,
1862 transform_id=work.get_work_id(),
1863 content=event._content if event else None)
1864 self.event_bus.send(event)
1865 has_abort_work = True
1866 if not has_abort_work:
1867 self.logger.info(log_pre + "not has abort work")
1868 self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1869 event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1870 self.event_bus.send(event)
1871 else:
1872
1873 self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1874 event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1875 self.event_bus.send(event)
1876
1877 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1878 except AssertionError as ex:
1879 self.logger.error("process_close_request, Failed to process event: %s" % str(event))
1880 self.logger.error(ex)
1881 self.logger.error(traceback.format_exc())
1882 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=str(ex))
1883 pro_ret = ReturnCode.Failed.value
1884 except Exception as ex:
1885 self.logger.error(ex)
1886 self.logger.error(traceback.format_exc())
1887 pro_ret = ReturnCode.Failed.value
1888 self.number_workers -= 1
1889 return pro_ret
1890
1891 def handle_resume_irequest(self, req, event):
1892 """
1893 process resume irequest
1894 """
1895 try:
1896 log_pre = self.get_log_prefix(req)
1897 self.logger.info(log_pre + "handle_resume_irequest event: %s" % str(event))
1898
1899 tfs = core_transforms.get_transforms(request_id=req['request_id'])
1900 for tf in tfs:
1901 if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]:
1902 continue
1903 else:
1904 event = ResumeTransformEvent(publisher_id=self.id,
1905 transform_id=tf['transform_id'],
1906 content=event._content if event else None)
1907 self.event_bus.send(event)
1908
1909 parameters = {'status': RequestStatus.Transforming,
1910 'substatus': RequestStatus.ToResume,
1911 'locking': RequestLocking.Idle,
1912 'request_metadata': req['request_metadata']
1913 }
1914 parameters = self.load_poll_period(req, parameters)
1915
1916 ret = {'request_id': req['request_id'],
1917 'parameters': parameters}
1918 self.logger.info(log_pre + "Handle resume irequest result: %s" % str(ret))
1919 return ret
1920 except Exception as ex:
1921 self.logger.error(ex)
1922 self.logger.error(traceback.format_exc())
1923 error = {'close_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1924 ret_req = {'request_id': req['request_id'],
1925 'parameters': {'status': RequestStatus.ToClose,
1926 'locking': RequestLocking.Idle,
1927 'errors': req['errors'] if req['errors'] else {}}}
1928 ret_req['parameters']['errors'].update(error)
1929 self.logger.info(log_pre + "handle_close_irequest exception result: %s" % str(ret_req))
1930 return ret_req
1931
1932 def handle_resume_request(self, req):
1933 """
1934 process resume request
1935 """
1936 try:
1937 req_status = RequestStatus.Resuming
1938
1939 processing_metadata = req['processing_metadata']
1940
1941 if 'workflow' in req['request_metadata'] and req['request_metadata']['workflow'] is not None:
1942 wf = req['request_metadata']['workflow']
1943 wf.resume_works()
1944 elif 'build_workflow' in req['request_metadata'] and req['request_metadata']['build_workflow'] is not None:
1945 req_status = RequestStatus.Building
1946 else:
1947 req_status = RequestStatus.Failed
1948
1949 ret_req = {'request_id': req['request_id'],
1950 'parameters': {'status': req_status,
1951 'request_metadata': req['request_metadata'],
1952 'processing_metadata': processing_metadata,
1953 'locking': RequestLocking.Idle},
1954 }
1955 return ret_req
1956 except Exception as ex:
1957 self.logger.error(ex)
1958 self.logger.error(traceback.format_exc())
1959 error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1960 ret_req = {'request_id': req['request_id'],
1961 'parameters': {'status': RequestStatus.ToResume,
1962 'locking': RequestLocking.Idle,
1963 'errors': req['errors'] if req['errors'] else {}}}
1964 ret_req['parameters']['errors'].update(error)
1965 return ret_req
1966
1967 def process_resume_request(self, event=None, request=None, command=None):
1968 self.number_workers += 1
1969 pro_ret = ReturnCode.Ok.value
1970 try:
1971 if request is None and event:
1972 req = self.get_request(request_id=event._request_id, locking=True)
1973 if not req:
1974 self.logger.error("Cannot find request for event: %s" % str(event))
1975 pro_ret = ReturnCode.Locked.value
1976 else:
1977 request = req
1978 if request:
1979 req = request
1980 log_pre = self.get_log_prefix(req)
1981 self.logger.info(log_pre + f"process_resume_request request: {req['request_id']} event: {event}")
1982
1983 if req['status'] in [RequestStatus.Finished]:
1984 ret = {'request_id': req['request_id'],
1985 'parameters': {'locking': RequestLocking.Idle,
1986 'command': CommandType.ResumeRequest,
1987 'errors': {'extra_msg': "Request is already finished. Cannot be resumed"}}}
1988 if req['errors'] and 'msg' in req['errors']:
1989 ret['parameters']['errors']['msg'] = req['errors']['msg']
1990 self.logger.info(log_pre + "process_resume_request result: %s" % str(ret))
1991
1992 self.update_request(ret, origin_req=req)
1993 self.handle_command(event, command=command, cmd_status=CommandStatus.Failed, errors="Request is already finished. Cannot be resumed")
1994 elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1995 ret = self.handle_resume_irequest(req)
1996 self.update_request(ret, origin_req=req)
1997
1998 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1999 else:
2000 ret = self.handle_resume_request(req)
2001 self.logger.info(log_pre + "process_resume_request result: %s" % str(ret))
2002
2003 self.update_request(ret, origin_req=req)
2004 if 'workflow' in req['request_metadata']:
2005 wf = req['request_metadata']['workflow']
2006 works = wf.get_all_works()
2007 if works:
2008 for work in works:
2009
2010 self.logger.info(log_pre + "ResumeTransformEvent(transform_id: %s)" % str(work.get_work_id()))
2011 event = ResumeTransformEvent(publisher_id=self.id,
2012 transform_id=work.get_work_id(),
2013 content=event._content if event else None)
2014 self.event_bus.send(event)
2015 else:
2016 self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
2017 event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
2018 self.event_bus.send(event)
2019
2020 core_transforms.abort_resume_transforms(request_id=req['request_id'], resume=True)
2021 core_processings.abort_resume_processings(request_id=req['request_id'], resume=True)
2022
2023 self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
2024 except Exception as ex:
2025 self.logger.error(ex)
2026 self.logger.error(traceback.format_exc())
2027 pro_ret = ReturnCode.Failed.value
2028 self.number_workers -= 1
2029 return pro_ret
2030
2031 def clean_locks(self, force=False):
2032 try:
2033 self.logger.info(f"clean locking: force: {force}")
2034 health_items = self.get_health_items()
2035 min_request_id = BaseAgent.min_request_id
2036 hostname, pid, thread_id, thread_name = self.get_process_thread_info()
2037 core_requests.clean_locking(health_items=health_items, min_request_id=min_request_id,
2038 time_period=self.clean_locks_time_period,
2039 force=force, hostname=hostname, pid=pid)
2040 except Exception as ex:
2041 self.logger.info(f"Failed clean locking: {ex}")
2042
2043 def init_event_function_map(self):
2044 self.event_func_map = {
2045 EventType.NewRequest: {
2046 'pre_check': self.is_ok_to_run_more_requests,
2047 'exec_func': self.process_new_request
2048 },
2049 EventType.UpdateRequest: {
2050 'pre_check': self.is_ok_to_run_more_requests,
2051 'exec_func': self.process_update_request
2052 },
2053 EventType.AbortRequest: {
2054 'pre_check': self.is_ok_to_run_more_requests,
2055 'exec_func': self.process_abort_request
2056 },
2057 EventType.ExpireRequest: {
2058 'pre_check': self.is_ok_to_run_more_requests,
2059 'exec_func': self.process_abort_request
2060 },
2061 EventType.ResumeRequest: {
2062 'pre_check': self.is_ok_to_run_more_requests,
2063 'exec_func': self.process_resume_request
2064 },
2065 EventType.CloseRequest: {
2066 'pre_check': self.is_ok_to_run_more_requests,
2067 'exec_func': self.process_close_request
2068 }
2069 }
2070
2071 def run(self):
2072 """
2073 Main run function.
2074 """
2075 try:
2076 self.logger.info("Starting main thread")
2077 self.init_thread_info()
2078
2079 self.load_plugins()
2080
2081 self.add_default_tasks()
2082 self.clean_locks(force=True)
2083
2084 self.init_event_function_map()
2085
2086 task = self.create_task(task_func=self.get_new_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
2087 self.add_task(task)
2088 task = self.create_task(task_func=self.get_running_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
2089 self.add_task(task)
2090 task = self.create_task(task_func=self.get_operation_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
2091 self.add_task(task)
2092 task = self.create_task(task_func=self.clean_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=3600, priority=1)
2093 self.add_task(task)
2094 task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=60, priority=1)
2095 self.add_task(task)
2096
2097 self.execute()
2098 except KeyboardInterrupt:
2099 self.stop()
2100
2101
2102 if __name__ == '__main__':
2103 agent = Clerk()
2104 agent()