Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 import datetime
0012 import random
0013 import time
0014 import traceback
0015 
0016 from idds.common import exceptions
0017 from idds.common.constants import (Sections, ReturnCode,
0018                                    RequestType, RequestStatus, RequestLocking,
0019                                    TransformType, WorkflowType, ConditionStatus,
0020                                    TransformStatus, ProcessingStatus,
0021                                    ContentStatus, ContentRelationType,
0022                                    CommandType, CommandStatus, CommandLocking)
0023 from idds.common.utils import setup_logging, truncate_string, str_to_date
0024 from idds.core import (requests as core_requests,
0025                        transforms as core_transforms,
0026                        processings as core_processings,
0027                        catalog as core_catalog,
0028                        throttlers as core_throttlers,
0029                        commands as core_commands)
0030 from idds.agents.common.baseagent import BaseAgent
0031 from idds.agents.common.eventbus.event import (EventType,
0032                                                # NewRequestEvent,
0033                                                UpdateRequestEvent,
0034                                                # AbortRequestEvent,
0035                                                # CloseRequestEvent,
0036                                                # ResumeRequestEvent,
0037                                                # NewTransformEvent,
0038                                                QueueTransformEvent,
0039                                                UpdateTransformEvent,
0040                                                AbortTransformEvent,
0041                                                ResumeTransformEvent,
0042                                                ExpireRequestEvent)
0043 
0044 from idds.agents.common.cache.redis import get_redis_cache
0045 
0046 
0047 setup_logging(__name__)
0048 
0049 
0050 class Clerk(BaseAgent):
0051     """
0052     Clerk works to process requests and converts requests to transforms.
0053     """
0054 
0055     def __init__(self, num_threads=1, max_number_workers=8, poll_period=10, retrieve_bulk_size=3, cache_expire_seconds=300, pending_time=None, **kwargs):
0056         self.max_number_workers = max_number_workers
0057         self.set_max_workers()
0058         num_threads = self.max_number_workers
0059         super(Clerk, self).__init__(num_threads=num_threads, name='Clerk', **kwargs)
0060         self.poll_period = int(poll_period)
0061         self.retrieve_bulk_size = int(retrieve_bulk_size)
0062         self.config_section = Sections.Clerk
0063         self.start_at = time.time()
0064 
0065         if pending_time:
0066             self.pending_time = float(pending_time)
0067         else:
0068             self.pending_time = None
0069 
0070         self.cache_expire_seconds = int(cache_expire_seconds)
0071 
0072         if not hasattr(self, 'release_helper') or not self.release_helper:
0073             self.release_helper = False
0074         elif str(self.release_helper).lower() == 'true':
0075             self.release_helper = True
0076         else:
0077             self.release_helper = False
0078 
0079         if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0080             self.new_poll_period = self.poll_period
0081         else:
0082             self.new_poll_period = int(self.new_poll_period)
0083         if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0084             self.update_poll_period = self.poll_period
0085         else:
0086             self.update_poll_period = int(self.update_poll_period)
0087         if not hasattr(self, 'throttle_poll_period') or not self.throttle_poll_period:
0088             self.throttle_poll_period = self.poll_period
0089         else:
0090             self.throttle_poll_period = int(self.new_poll_period)
0091 
0092         if hasattr(self, 'poll_period_increase_rate'):
0093             self.poll_period_increase_rate = float(self.poll_period_increase_rate)
0094         else:
0095             self.poll_period_increase_rate = 2
0096 
0097         if hasattr(self, 'max_new_poll_period'):
0098             self.max_new_poll_period = int(self.max_new_poll_period)
0099         else:
0100             self.max_new_poll_period = 3600 * 6
0101         if hasattr(self, 'max_update_poll_period'):
0102             self.max_update_poll_period = int(self.max_update_poll_period)
0103         else:
0104             self.max_update_poll_period = 3600 * 6
0105 
0106         if not hasattr(self, 'new_command_poll_period') or not self.new_command_poll_period:
0107             self.new_command_poll_period = 1
0108         else:
0109             self.new_command_poll_period = int(self.new_command_poll_period)
0110         if not hasattr(self, 'update_command_poll_period') or not self.update_command_poll_period:
0111             self.update_command_poll_period = self.poll_period
0112         else:
0113             self.update_command_poll_period = int(self.update_command_poll_period)
0114 
0115         if hasattr(self, 'max_new_retries'):
0116             self.max_new_retries = int(self.max_new_retries)
0117         else:
0118             self.max_new_retries = 3
0119         if hasattr(self, 'max_update_retries'):
0120             self.max_update_retries = int(self.max_update_retries)
0121         else:
0122             # 0 or None means no limitations.
0123             self.max_update_retries = 0
0124 
0125         self.number_workers = 0
0126         if not hasattr(self, 'max_number_workers') or not self.max_number_workers:
0127             self.max_number_workers = 3
0128         else:
0129             self.max_number_workers = int(self.max_number_workers)
0130 
0131         self.show_queue_size_time = None
0132 
0133         if hasattr(self, 'clean_locks_time_period'):
0134             self.clean_locks_time_period = int(self.clean_locks_time_period)
0135         else:
0136             self.clean_locks_time_period = 1800
0137 
0138     def is_ok_to_run_more_requests(self):
0139         if self.get_num_free_workers() > 0:
0140             return True
0141         return False
0142 
0143     def show_queue_size(self):
0144         if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0145             self.show_queue_size_time = time.time()
0146             q_str = "min request_id: %s, number of requests: %s, max number of requests: %s" % (BaseAgent.min_request_id,
0147                                                                                                 self.get_num_workers(),
0148                                                                                                 self.get_max_workers())
0149             self.logger.debug(q_str)
0150 
0151     def get_bulk_size(self):
0152         return min(self.retrieve_bulk_size, self.get_num_free_workers())
0153 
0154     def get_new_requests(self):
0155         """
0156         Get new requests to process
0157         """
0158         try:
0159             # req_status = [RequestStatus.TransformingOpen]
0160             # reqs_open = core_requests.get_requests_by_status_type(status=req_status, time_period=3600)
0161             # self.logger.info("Main thread get %s TransformingOpen requests to process" % len(reqs_open))
0162 
0163             if not self.is_ok_to_run_more_requests():
0164                 return []
0165 
0166             self.show_queue_size()
0167 
0168             if time.time() < self.start_at + 3600:
0169                 if BaseAgent.poll_new_min_request_id_times % 30 == 0:
0170                     # get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes.
0171                     if BaseAgent.min_request_id:
0172                         min_request_id = BaseAgent.min_request_id - 1000
0173                     else:
0174                         min_request_id = None
0175                 else:
0176                     min_request_id = BaseAgent.min_request_id
0177             else:
0178                 if BaseAgent.poll_new_min_request_id_times % 180 == 0:
0179                     # get_new_requests is called every 10 seconds. 180 * 10 = 300 seconds, which is 30 minutes.
0180                     if BaseAgent.min_request_id:
0181                         min_request_id = BaseAgent.min_request_id - 1000
0182                     else:
0183                         min_request_id = None
0184                 else:
0185                     min_request_id = BaseAgent.min_request_id
0186 
0187             BaseAgent.poll_new_min_request_id_times += 1
0188 
0189             req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling]
0190             reqs_new = core_requests.get_requests_by_status_type(status=req_status, locking=True,
0191                                                                  min_request_id=min_request_id,
0192                                                                  bulk_size=self.get_bulk_size(),
0193                                                                  new_poll=True, only_return_id=False)
0194 
0195             # self.logger.debug("Main thread get %s [New+Extend] requests to process" % len(reqs_new))
0196             if reqs_new:
0197                 req_ids = [req["request_id"] for req in reqs_new]
0198                 self.logger.info("Main thread get [New+Extend] requests to process: %s" % str(req_ids))
0199 
0200             for req in reqs_new:
0201                 req_id = req["request_id"]
0202                 self.submit(self.process_new_request, **{"request": req})
0203                 BaseAgent.min_request_id_cache[req_id] = time.time()
0204                 if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
0205                     BaseAgent.min_request_id = req_id
0206                     self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
0207                     core_requests.set_min_request_id(BaseAgent.min_request_id)
0208             return reqs_new
0209         except exceptions.DatabaseException as ex:
0210             if 'ORA-00060' in str(ex):
0211                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0212             else:
0213                 # raise ex
0214                 self.logger.error(ex)
0215                 self.logger.error(traceback.format_exc())
0216         return []
0217 
0218     def get_running_requests(self):
0219         """
0220         Get running requests
0221         """
0222         try:
0223             if not self.is_ok_to_run_more_requests():
0224                 return []
0225 
0226             self.show_queue_size()
0227 
0228             if time.time() < self.start_at + 3600:
0229                 if BaseAgent.poll_running_min_request_id_times % 30 == 0:
0230                     # get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes.
0231                     if BaseAgent.min_request_id:
0232                         min_request_id = BaseAgent.min_request_id - 1000
0233                     else:
0234                         min_request_id = None
0235                 else:
0236                     min_request_id = BaseAgent.min_request_id
0237             else:
0238                 if BaseAgent.poll_running_min_request_id_times % 180 == 0:
0239                     # get_new_requests is called every 10 seconds. 180 * 10 = 1800 seconds, which is 30 minutes.
0240                     if BaseAgent.min_request_id:
0241                         min_request_id = BaseAgent.min_request_id - 1000
0242                     else:
0243                         min_request_id = None
0244                 else:
0245                     min_request_id = BaseAgent.min_request_id
0246 
0247             BaseAgent.poll_running_min_request_id_times += 1
0248 
0249             req_status = [RequestStatus.Transforming, RequestStatus.ToCancel, RequestStatus.Cancelling,
0250                           RequestStatus.ToSuspend, RequestStatus.Suspending,
0251                           RequestStatus.ToExpire, RequestStatus.Expiring,
0252                           RequestStatus.ToFinish, RequestStatus.ToForceFinish,
0253                           RequestStatus.ToResume, RequestStatus.Resuming,
0254                           RequestStatus.Building, RequestStatus.ToClose]
0255             reqs = core_requests.get_requests_by_status_type(status=req_status, time_period=None,
0256                                                              min_request_id=min_request_id,
0257                                                              locking=True,
0258                                                              bulk_size=self.get_bulk_size(),
0259                                                              update_poll=True, only_return_id=False)
0260 
0261             # self.logger.debug("Main thread get %s Transforming requests to running" % len(reqs))
0262             if reqs:
0263                 req_ids = [req["request_id"] for req in reqs]
0264                 self.logger.info("Main thread get Transforming requests to running: %s" % str(req_ids))
0265 
0266             for req in reqs:
0267                 req_id = req["request_id"]
0268                 self.submit(self.process_update_request, **{"request": req})
0269 
0270                 BaseAgent.min_request_id_cache[req_id] = time.time()
0271                 if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
0272                     BaseAgent.min_request_id = req_id
0273                     self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
0274                     core_requests.set_min_request_id(BaseAgent.min_request_id)
0275 
0276             return reqs
0277         except exceptions.DatabaseException as ex:
0278             if 'ORA-00060' in str(ex):
0279                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0280             else:
0281                 # raise ex
0282                 self.logger.error(ex)
0283                 self.logger.error(traceback.format_exc())
0284         return []
0285 
0286     def get_operation_requests(self):
0287         """
0288         Get running requests
0289         """
0290         try:
0291             if not self.is_ok_to_run_more_requests():
0292                 return []
0293 
0294             self.show_queue_size()
0295 
0296             status = [CommandStatus.New]
0297             new_commands = core_commands.get_commands_by_status(status=status, locking=True, period=self.new_command_poll_period)
0298             status = [CommandStatus.Processing]
0299             processing_commands = core_commands.get_commands_by_status(status=status, locking=True,
0300                                                                        period=self.update_command_poll_period)
0301             commands = new_commands + processing_commands
0302 
0303             # self.logger.debug("Main thread get %s commands" % len(commands))
0304             if commands:
0305                 self.logger.info("Main thread get %s commands" % len(commands))
0306 
0307             update_commands = []
0308             for cmd in commands:
0309                 request_id = cmd['request_id']
0310                 # cmd_content = cmd['cmd_content']
0311                 cmd_type = cmd['cmd_type']
0312                 cmd_status = cmd['status']
0313 
0314                 if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id:
0315                     BaseAgent.min_request_id = request_id
0316                     self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
0317                     BaseAgent.min_request_id_cache[request_id] = time.time()
0318                     core_requests.set_min_request_id(BaseAgent.min_request_id)
0319 
0320                 if cmd_status in [CommandStatus.New, CommandStatus.Processing]:
0321                     req = self.get_request(request_id, status=None, locking=True)
0322                     if req:
0323                         if cmd_type in [CommandType.AbortRequest]:
0324                             self.submit(self.process_abort_request, **{"request": req, "command": cmd["cmd_id"]})
0325                         elif cmd_type in [CommandType.ResumeRequest]:
0326                             self.submit(self.process_resume_request, **{"request": req, "command": cmd["cmd_id"]})
0327                         elif cmd_type in [CommandType.CloseRequest]:
0328                             self.submit(self.process_close_request, **{"request": req, "command": cmd["cmd_id"]})
0329 
0330                         u_command = {'cmd_id': cmd['cmd_id'],
0331                                      'status': CommandStatus.Processing,
0332                                      'locking': CommandLocking.Idle}
0333                         update_commands.append(u_command)
0334                 else:
0335                     u_command = {'cmd_id': cmd['cmd_id'],
0336                                  'status': CommandStatus.UnknownCommand,
0337                                  'locking': CommandLocking.Idle}
0338                     update_commands.append(u_command)
0339                 core_commands.update_commands(update_commands)
0340             return commands
0341         except exceptions.DatabaseException as ex:
0342             if 'ORA-00060' in str(ex):
0343                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0344             else:
0345                 # raise ex
0346                 self.logger.error(ex)
0347                 self.logger.error(traceback.format_exc())
0348         return []
0349 
0350     def clean_min_request_id(self):
0351         try:
0352             if BaseAgent.checking_min_request_id_times <= 0:
0353                 old_min_request_id = core_requests.get_min_request_id()
0354                 self.logger.info("old_min_request_id: %s" % old_min_request_id)
0355                 if not old_min_request_id:
0356                     min_request_id = 0
0357                 else:
0358                     min_request_id = old_min_request_id - 1000
0359                 BaseAgent.min_request_id = min_request_id
0360             else:
0361                 for req_id in list(BaseAgent.min_request_id_cache.keys()):
0362                     time_stamp = BaseAgent.min_request_id_cache[req_id]
0363                     if time_stamp < time.time() - 12 * 3600:       # older than 12 hours
0364                         del BaseAgent.min_request_id_cache[req_id]
0365 
0366                 if BaseAgent.min_request_id_cache:
0367                     min_request_id = min(list(BaseAgent.min_request_id_cache.keys()))
0368                     BaseAgent.min_request_id = min_request_id
0369                     core_requests.set_min_request_id(BaseAgent.min_request_id)
0370 
0371             BaseAgent.checking_min_request_id_times += 1
0372         except Exception as ex:
0373             self.logger.error(ex)
0374             self.logger.error(traceback.format_exc())
0375 
0376     def get_request(self, request_id, status=None, locking=False):
0377         try:
0378             return core_requests.get_request_by_id_status(request_id=request_id, status=status, locking=locking)
0379         except exceptions.DatabaseException as ex:
0380             if 'ORA-00060' in str(ex):
0381                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0382             else:
0383                 # raise ex
0384                 self.logger.error(ex)
0385                 self.logger.error(traceback.format_exc())
0386         return None
0387 
0388     def load_poll_period(self, req, parameters, throttling=False):
0389         if self.new_poll_period and req['new_poll_period'] != self.new_poll_period:
0390             parameters['new_poll_period'] = self.new_poll_period
0391         if throttling:
0392             parameters['new_poll_period'] = self.throttle_poll_period
0393         if self.update_poll_period and req['update_poll_period'] != self.update_poll_period:
0394             parameters['update_poll_period'] = self.update_poll_period
0395         parameters['max_new_retries'] = req['max_new_retries'] if req['max_new_retries'] is not None else self.max_new_retries
0396         parameters['max_update_retries'] = req['max_update_retries'] if req['max_update_retries'] is not None else self.max_update_retries
0397         return parameters
0398 
0399     def get_work_tag_attribute(self, work_tag, attribute):
0400         work_tag_attribute_value = None
0401         if work_tag:
0402             work_tag_attribute = work_tag + "_" + attribute
0403             if hasattr(self, work_tag_attribute):
0404                 work_tag_attribute_value = int(getattr(self, work_tag_attribute))
0405         return work_tag_attribute_value
0406 
0407     def generate_transform(self, req, work, build=False, iworkflow=False):
0408         if iworkflow:
0409             wf = None
0410         else:
0411             if build:
0412                 wf = req['request_metadata']['build_workflow']
0413             else:
0414                 wf = req['request_metadata']['workflow']
0415 
0416         work.set_request_id(req['request_id'])
0417         work.username = req['username']
0418 
0419         transform_tag = work.get_work_tag()
0420         if req['max_new_retries']:
0421             max_new_retries = req['max_new_retries']
0422         else:
0423             work_tag_max_new_retries = self.get_work_tag_attribute(transform_tag, "max_new_retries")
0424             if work_tag_max_new_retries:
0425                 max_new_retries = work_tag_max_new_retries
0426             else:
0427                 max_new_retries = self.max_new_retries
0428 
0429         if req['max_update_retries']:
0430             max_update_retries = req['max_update_retries']
0431         else:
0432             work_tag_max_update_retries = self.get_work_tag_attribute(transform_tag, "max_update_retries")
0433             if work_tag_max_update_retries:
0434                 max_update_retries = work_tag_max_update_retries
0435             else:
0436                 max_update_retries = self.max_update_retries
0437 
0438         transform_type = TransformType.Workflow
0439         try:
0440             work_type = work.get_work_type()
0441             if build:
0442                 # todo
0443                 # transform_type = TransformType.BuildWork
0444                 pass
0445             elif work_type in [WorkflowType.iWorkflowLocal]:
0446                 # no need to generate transform
0447                 return None
0448             elif work_type in [WorkflowType.iWorkflow]:
0449                 transform_type = TransformType.iWorkflow
0450             elif work_type in [WorkflowType.iWork]:
0451                 transform_type = TransformType.iWork
0452             elif work_type in [WorkflowType.GenericWorkflow]:
0453                 transform_type = TransformType.GenericWorkflow
0454             elif work_type in [WorkflowType.GenericWork]:
0455                 transform_type = TransformType.GenericWork
0456         except Exception:
0457             pass
0458 
0459         has_previous_conditions = None
0460         try:
0461             if hasattr(work, 'get_previous_conditions'):
0462                 work_previous_conditions = work.get_previous_conditions()
0463             if work_previous_conditions:
0464                 has_previous_conditions = len(work_previous_conditions)
0465         except Exception:
0466             pass
0467 
0468         triggered_conditions = []
0469         untriggered_conditions = []
0470         try:
0471             if hasattr(work, 'get_following_conditions'):
0472                 following_conditions = work.get_following_conditions()
0473                 for cond in following_conditions:
0474                     untriggered_conditions.append(cond)
0475         except Exception:
0476             pass
0477 
0478         loop_index = None
0479         try:
0480             if hasattr(work, 'get_loop_index'):
0481                 loop_index = work.get_loop_index()
0482         except Exception:
0483             pass
0484 
0485         # transform_status = TransformStatus.New
0486         transform_status = TransformStatus.Queue
0487         if has_previous_conditions:
0488             transform_status = TransformStatus.WaitForTrigger
0489 
0490         site = req['site']
0491         if not site:
0492             try:
0493                 cloud = None
0494                 if hasattr(work, 'task_cloud') and work.task_cloud:
0495                     cloud = work.task_cloud
0496 
0497                 if hasattr(work, 'task_queue') and work.task_queue:
0498                     queue = work.task_queue
0499                 elif hasattr(work, 'queue') and work.queue:
0500                     queue = work.queue
0501                 else:
0502                     queue = None
0503 
0504                 task_site = None
0505                 if hasattr(work, 'task_site') and work.task_site:
0506                     task_site = work.task_site
0507                 site = f"{cloud},{task_site},{queue}"
0508             except Exception:
0509                 pass
0510 
0511         new_transform = {'request_id': req['request_id'],
0512                          'workload_id': req['workload_id'],
0513                          'transform_type': transform_type,
0514                          'transform_tag': work.get_work_tag(),
0515                          'priority': req['priority'],
0516                          'status': transform_status,
0517                          'retries': 0,
0518                          'parent_transform_id': None,
0519                          'previous_transform_id': None,
0520                          'name': work.get_work_name(),
0521                          'new_poll_period': self.new_poll_period,
0522                          'update_poll_period': self.update_poll_period,
0523                          'max_new_retries': max_new_retries,
0524                          'max_update_retries': max_update_retries,
0525                          # 'expired_at': req['expired_at'],
0526                          'expired_at': None,
0527                          'internal_id': work.internal_id,
0528                          'parent_internal_id': None if not work.parent_internal_ids else ",".join(work.parent_internal_ids),
0529                          'has_previous_conditions': has_previous_conditions,
0530                          'triggered_conditions': triggered_conditions,
0531                          'untriggered_conditions': untriggered_conditions,
0532                          'loop_index': loop_index,
0533                          'site': site,
0534                          'transform_metadata': {'internal_id': work.get_internal_id(),
0535                                                 'template_work_id': work.get_template_work_id(),
0536                                                 'sequence_id': work.get_sequence_id(),
0537                                                 'work_name': work.get_work_name(),
0538                                                 'work': work,
0539                                                 'workflow': wf}
0540                          # 'running_metadata': {'work_data': new_work.get_running_data()}
0541                          # 'collections': related_collections
0542                          }
0543 
0544         return new_transform
0545 
0546     def generate_condition(self, req, cond):
0547         previous_works = cond.previous_works
0548         following_works = cond.following_works
0549         previous_transforms, following_transforms = [], []
0550         previous_transforms = previous_works
0551         following_transforms = following_works
0552 
0553         new_condition = {'request_id': req['request_id'],
0554                          'internal_id': cond.internal_id,
0555                          'status': ConditionStatus.WaitForTrigger,
0556                          'substatus': None,
0557                          'is_loop': False,
0558                          'loop_index': None,
0559                          'cloned_from': None,
0560                          'evaluate_result': None,
0561                          'previous_transforms': previous_transforms,
0562                          'following_transforms': following_transforms,
0563                          'condition': {'condition': cond}}
0564         return new_condition
0565 
0566     def get_num_active_requests(self, site_name):
0567         cache = get_redis_cache()
0568         num_requests = cache.get("num_requests", default=None)
0569         if num_requests is None:
0570             num_requests = {}
0571             active_status = [RequestStatus.New, RequestStatus.Ready, RequestStatus.Throttling]
0572             active_status1 = [RequestStatus.Transforming, RequestStatus.Terminating]
0573             rets = core_requests.get_num_active_requests(active_status + active_status1)
0574             for ret in rets:
0575                 status, site, count = ret
0576                 if site is None:
0577                     site = 'Default'
0578                 if site not in num_requests:
0579                     num_requests[site] = {'new': 0, 'processing': 0}
0580                 if status in active_status:
0581                     num_requests[site]['new'] += count
0582                 elif status in active_status1:
0583                     num_requests[site]['processing'] += count
0584             cache.set("num_requests", num_requests, expire_seconds=self.cache_expire_seconds)
0585         default_value = {'new': 0, 'processing': 0}
0586         return num_requests.get(site_name, default_value)
0587 
0588     def get_num_active_transforms(self, site_name):
0589         cache = get_redis_cache()
0590         num_transforms = cache.get("num_transforms", default=None)
0591         if num_transforms is None:
0592             num_transforms = {}
0593             active_status = [TransformStatus.New, TransformStatus.Ready]
0594             active_status1 = [TransformStatus.Transforming, TransformStatus.Terminating]
0595             rets = core_transforms.get_num_active_transforms(active_status + active_status1)
0596             for ret in rets:
0597                 status, site, count = ret
0598                 if site is None:
0599                     site = 'Default'
0600                 if site not in num_transforms:
0601                     num_transforms[site] = {'new': 0, 'processing': 0}
0602                 if status in active_status:
0603                     num_transforms[site]['new'] += count
0604                 elif status in active_status1:
0605                     num_transforms[site]['processing'] += count
0606             cache.set("num_transforms", num_transforms, expire_seconds=self.cache_expire_seconds)
0607         default_value = {'new': 0, 'processing': 0}
0608         return num_transforms.get(site_name, default_value)
0609 
0610     def get_num_active_processings(self, site_name):
0611         cache = get_redis_cache()
0612         num_processings = cache.get("num_processings", default=None)
0613         active_transforms = cache.get("active_transforms", default={})
0614         if num_processings is None:
0615             num_processings = {}
0616             active_transforms = {}
0617             active_status = [ProcessingStatus.New]
0618             active_status1 = [ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0619                               ProcessingStatus.Running, ProcessingStatus.Terminating, ProcessingStatus.ToTrigger,
0620                               ProcessingStatus.Triggering]
0621             rets = core_processings.get_active_processings(active_status + active_status1)
0622             for ret in rets:
0623                 req_id, trf_id, pr_id, site, status = ret
0624                 if site is None:
0625                     site = 'Default'
0626                 if site not in num_processings:
0627                     num_processings[site] = {'new': 0, 'processing': 0}
0628                     active_transforms[site] = []
0629                 if status in active_status:
0630                     num_processings[site]['new'] += 1
0631                 elif status in active_status1:
0632                     num_processings[site]['processing'] += 1
0633                 active_transforms[site].append(trf_id)
0634             cache.set("num_processings", num_processings, expire_seconds=self.cache_expire_seconds)
0635             cache.set("active_transforms", active_transforms, expire_seconds=self.cache_expire_seconds)
0636         default_value = {'new': 0, 'processing': 0}
0637         return num_processings.get(site_name, default_value), active_transforms
0638 
0639     def get_num_active_contents(self, site_name, active_transform_ids):
0640         cache = get_redis_cache()
0641         # 1. input contents not terminated
0642         # 2. output contents not terminated
0643         tf_id_site_map = {}
0644         all_tf_ids = []
0645         for site in active_transform_ids:
0646             all_tf_ids += active_transform_ids[site]
0647             for tf_id in active_transform_ids[site]:
0648                 tf_id_site_map[tf_id] = site
0649 
0650         num_input_contents = cache.get("num_input_contents", default=None)
0651         num_output_contents = cache.get("num_output_contents", default=None)
0652         if num_input_contents is None or num_output_contents is None:
0653             num_input_contents, num_output_contents = {}, {}
0654             if all_tf_ids:
0655                 ret = core_catalog.get_content_status_statistics_by_relation_type(all_tf_ids)
0656                 for item in ret:
0657                     status, relation_type, transform_id, count = item
0658                     site = tf_id_site_map[transform_id]
0659                     if site not in num_input_contents:
0660                         num_input_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0661                         num_output_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0662                     if status in [ContentStatus.New]:
0663                         if relation_type == ContentRelationType.Input:
0664                             num_input_contents[site]['new'] += count
0665                         elif relation_type == ContentRelationType.Output:
0666                             num_output_contents[site]['new'] += count
0667                     if status in [ContentStatus.Activated]:
0668                         if relation_type == ContentRelationType.Input:
0669                             num_input_contents[site]['activated'] += count
0670                         elif relation_type == ContentRelationType.Output:
0671                             num_output_contents[site]['activated'] += count
0672                     else:
0673                         if relation_type == ContentRelationType.Input:
0674                             num_input_contents[site]['processed'] += count
0675                         elif relation_type == ContentRelationType.Output:
0676                             num_output_contents[site]['processed'] += count
0677 
0678             cache.set("num_input_contents", num_input_contents, expire_seconds=self.cache_expire_seconds)
0679             cache.set("num_output_contents", num_output_contents, expire_seconds=self.cache_expire_seconds)
0680         default_value = {'new': 0, 'activated': 0, 'processed': 0}
0681         return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value)
0682 
0683     def get_throttlers(self):
0684         cache = get_redis_cache()
0685         throttlers = cache.get("throttlers", default=None)
0686         if throttlers is None:
0687             throttler_items = core_throttlers.get_throttlers()
0688             throttlers = {}
0689             for item in throttler_items:
0690                 throttlers[item['site']] = {'num_requests': item['num_requests'],
0691                                             'num_transforms': item['num_transforms'],
0692                                             'num_processings': item['num_processings'],
0693                                             'new_contents': item['new_contents'],
0694                                             'queue_contents': item['queue_contents'],
0695                                             'others': item['others'],
0696                                             'status': item['status']}
0697             cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds)
0698         return throttlers
0699 
0700     def whether_to_throttle(self, request):
0701         # disable throttler in clerk. throttler will run in transformer
0702         return False
0703 
0704         try:
0705             site = request['site']
0706             if site is None:
0707                 site = 'Default'
0708             throttlers = self.get_throttlers()
0709             num_requests = self.get_num_active_requests(site)
0710             num_transforms = self.get_num_active_transforms(site)
0711             num_processings, active_transforms = self.get_num_active_processings(site)
0712             num_input_contents, num_output_contents = self.get_num_active_contents(site, active_transforms)
0713             self.logger.info("throttler(site: %s): active requests(%s), transforms(%s), processings(%s)" % (site, num_requests, num_transforms, num_processings))
0714             self.logger.info("throttler(site: %s): active input contents(%s), output contents(%s)" % (site, num_input_contents, num_output_contents))
0715 
0716             throttle_requests = throttlers.get(site, {}).get('num_requests', None)
0717             throttle_transforms = throttlers.get(site, {}).get('num_transforms', None)
0718             throttle_processings = throttlers.get(site, {}).get('num_processings', None)
0719             throttle_new_jobs = throttlers.get(site, {}).get('new_contents', None)
0720             throttle_queue_jobs = throttlers.get(site, {}).get('queue_contents', None)
0721             self.logger.info("throttler(site: %s): throttle_requests %s, throttle_transforms: %s, throttle_processings: %s" % (site, throttle_requests, throttle_transforms, throttle_processings))
0722             if throttle_requests:
0723                 if num_requests['processing'] >= throttle_requests:
0724                     self.logger.info("throttler(site: %s): num of processing requests (%s) is bigger than throttle_requests (%s), set throttling" % (site, num_requests['processing'], throttle_requests))
0725                     return True
0726             if throttle_transforms:
0727                 if num_transforms['processing'] >= throttle_transforms:
0728                     self.logger.info("throttler(site: %s): num of processing transforms (%s) is bigger than throttle_transforms (%s), set throttling" % (site, num_transforms['processing'], throttle_transforms))
0729                     return True
0730             if throttle_processings:
0731                 if num_processings['processing'] >= throttle_processings:
0732                     self.logger.info("throttler(site: %s): num of processing processings (%s) is bigger than throttle_processings (%s), set throttling" % (site, num_processings['processing'], throttle_processings))
0733                     return True
0734 
0735             new_jobs = num_input_contents['new']
0736             released_jobs = num_input_contents['processed']
0737             terminated_jobs = num_output_contents['processed']
0738             queue_jobs = released_jobs - terminated_jobs
0739 
0740             self.logger.info("throttler(site: %s): throttle_new_jobs: %s, throttle_queue_jobs: %s" % (site, throttle_new_jobs, throttle_queue_jobs))
0741             self.logger.info("throttler(site: %s): new_jobs: %s, queue_jobs: %s" % (site, new_jobs, queue_jobs))
0742             if throttle_new_jobs:
0743                 if new_jobs >= throttle_new_jobs:
0744                     self.logger.info("throttler(site: %s): num of new jobs(not released) (%s) is bigger than throttle_new_jobs (%s), set throttling" % (site, new_jobs, throttle_new_jobs))
0745                     return True
0746             if throttle_queue_jobs:
0747                 if queue_jobs >= throttle_queue_jobs:
0748                     self.logger.info("throttler(site: %s): num of queue jobs(released but not terminated) (%s) is bigger than throttle_queue_jobs (%s), set throttling" % (site, queue_jobs, throttle_queue_jobs))
0749                     return True
0750 
0751             return False
0752         except Exception as ex:
0753             self.logger.error("whether_to_throttle: %s" % str(ex))
0754             self.logger.error(traceback.format_exc())
0755         return False
0756 
0757     def get_log_prefix(self, req):
0758         return "<request_id=%s>" % req['request_id']
0759 
0760     def handle_new_request(self, req):
0761         try:
0762             log_pre = self.get_log_prefix(req)
0763             self.logger.info(log_pre + "Handle new request")
0764             to_throttle = self.whether_to_throttle(req)
0765             if to_throttle:
0766                 ret_req = {'request_id': req['request_id'],
0767                            'parameters': {'status': RequestStatus.Throttling,
0768                                           'locking': RequestLocking.Idle}}
0769                 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True)
0770                 self.logger.info(log_pre + "Throttle new request result: %s" % str(ret_req))
0771             else:
0772                 workflow = req['request_metadata']['workflow']
0773 
0774                 # wf = workflow.copy()
0775                 wf = workflow
0776                 works = wf.get_new_works()
0777                 transforms = []
0778                 for work in works:
0779                     # new_work = work.copy()
0780                     new_work = work
0781                     new_work.add_proxy(wf.get_proxy())
0782                     # new_work.set_request_id(req['request_id'])
0783                     # new_work.create_processing()
0784 
0785                     transform = self.generate_transform(req, work)
0786                     if transform:
0787                         transforms.append(transform)
0788                 self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'],
0789                                                                                             str(transforms)))
0790                 # processing_metadata = req['processing_metadata']
0791                 # processing_metadata = {'workflow_data': wf.get_running_data()}
0792 
0793                 ret_req = {'request_id': req['request_id'],
0794                            'parameters': {'status': RequestStatus.Transforming,
0795                                           'locking': RequestLocking.Idle,
0796                                           # 'processing_metadata': processing_metadata,
0797                                           'request_metadata': req['request_metadata']},
0798                            'new_transforms': transforms}
0799                 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0800                 self.logger.info(log_pre + "Handle new request result: %s" % str(ret_req))
0801         except Exception as ex:
0802             self.logger.error(ex)
0803             self.logger.error(traceback.format_exc())
0804             retries = req['new_retries'] + 1
0805             if not req['max_new_retries'] or retries < req['max_new_retries']:
0806                 req_status = req['status']
0807             else:
0808                 req_status = RequestStatus.Failed
0809 
0810             # increase poll period
0811             new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0812             if new_poll_period > self.max_new_poll_period:
0813                 new_poll_period = self.max_new_poll_period
0814 
0815             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0816 
0817             ret_req = {'request_id': req['request_id'],
0818                        'parameters': {'status': req_status,
0819                                       'locking': RequestLocking.Idle,
0820                                       'new_retries': retries,
0821                                       'new_poll_period': new_poll_period,
0822                                       'errors': req['errors'] if req['errors'] else {}}}
0823             ret_req['parameters']['errors'].update(error)
0824             self.logger.warn(log_pre + "Handle new request error result: %s" % str(ret_req))
0825         return ret_req
0826 
0827     def handle_new_irequest(self, req):
0828         try:
0829             log_pre = self.get_log_prefix(req)
0830             self.logger.info(log_pre + "Handle new irequest")
0831             to_throttle = self.whether_to_throttle(req)
0832             if to_throttle:
0833                 ret_req = {'request_id': req['request_id'],
0834                            'parameters': {'status': RequestStatus.Throttling,
0835                                           'locking': RequestLocking.Idle}}
0836                 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True)
0837                 self.logger.info(log_pre + "Throttle new irequest result: %s" % str(ret_req))
0838             else:
0839                 workflow = req['request_metadata']['workflow']
0840 
0841                 transforms = []
0842                 transform = self.generate_transform(req, workflow)
0843                 if transform:
0844                     transforms.append(transform)
0845                 self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'],
0846                                                                                             str(transforms)))
0847                 ret_req = {'request_id': req['request_id'],
0848                            'parameters': {'status': RequestStatus.Transforming,
0849                                           'locking': RequestLocking.Idle},
0850                            'new_transforms': transforms}
0851                 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0852                 self.logger.info(log_pre + "Handle new irequest result: %s" % str(ret_req))
0853         except Exception as ex:
0854             self.logger.error(ex)
0855             self.logger.error(traceback.format_exc())
0856             retries = req['new_retries'] + 1
0857             if not req['max_new_retries'] or retries < req['max_new_retries']:
0858                 req_status = req['status']
0859             else:
0860                 req_status = RequestStatus.Failed
0861 
0862             # increase poll period
0863             new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0864             if new_poll_period > self.max_new_poll_period:
0865                 new_poll_period = self.max_new_poll_period
0866 
0867             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0868 
0869             ret_req = {'request_id': req['request_id'],
0870                        'parameters': {'status': req_status,
0871                                       'locking': RequestLocking.Idle,
0872                                       'new_retries': retries,
0873                                       'new_poll_period': new_poll_period,
0874                                       'errors': req['errors'] if req['errors'] else {}}}
0875             ret_req['parameters']['errors'].update(error)
0876             self.logger.warn(log_pre + "Handle new irequest error result: %s" % str(ret_req))
0877         return ret_req
0878 
0879     def handle_new_generic_request(self, req):
0880         try:
0881             log_pre = self.get_log_prefix(req)
0882             self.logger.info(log_pre + "Handle new generic request")
0883             to_throttle = self.whether_to_throttle(req)
0884             if to_throttle:
0885                 ret_req = {'request_id': req['request_id'],
0886                            'parameters': {'status': RequestStatus.Throttling,
0887                                           'locking': RequestLocking.Idle}}
0888                 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True)
0889                 self.logger.info(log_pre + "Throttle new generic request result: %s" % str(ret_req))
0890             else:
0891                 workflow = req['request_metadata']['workflow']
0892 
0893                 transforms = []
0894                 works = workflow.get_works()
0895                 for w in works:
0896                     # todo
0897                     # set has_previous_conditions, has_conditions, all_conditions_triggered(False), cloned_from(None)
0898                     transform = self.generate_transform(req, w)
0899                     if transform:
0900                         transforms.append(transform)
0901                 self.logger.debug(log_pre + f"Processing request({req['request_id']}): new transforms: {transforms}")
0902 
0903                 conds = workflow.get_conditions()
0904                 conditions = []
0905                 for cond in conds:
0906                     condition = self.generate_condition(req, cond)
0907                     if condition:
0908                         conditions.append(condition)
0909                 self.logger.debug(log_pre + f"Processing request({req['request_id']}), new conditions: {conditions}")
0910 
0911                 ret_req = {'request_id': req['request_id'],
0912                            'parameters': {'status': RequestStatus.Transforming,
0913                                           'locking': RequestLocking.Idle},
0914                            'new_transforms': transforms,
0915                            'new_conditions': conditions}
0916                 ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0917                 self.logger.info(log_pre + "Handle new generic request result: %s" % str(ret_req))
0918         except Exception as ex:
0919             self.logger.error(ex)
0920             self.logger.error(traceback.format_exc())
0921             retries = req['new_retries'] + 1
0922             if not req['max_new_retries'] or retries < req['max_new_retries']:
0923                 req_status = req['status']
0924             else:
0925                 req_status = RequestStatus.Failed
0926 
0927             # increase poll period
0928             new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0929             if new_poll_period > self.max_new_poll_period:
0930                 new_poll_period = self.max_new_poll_period
0931 
0932             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0933 
0934             ret_req = {'request_id': req['request_id'],
0935                        'parameters': {'status': req_status,
0936                                       'locking': RequestLocking.Idle,
0937                                       'new_retries': retries,
0938                                       'new_poll_period': new_poll_period,
0939                                       'errors': req['errors'] if req['errors'] else {}}}
0940             ret_req['parameters']['errors'].update(error)
0941             self.logger.warn(log_pre + "Handle new irequest error result: %s" % str(ret_req))
0942         return ret_req
0943 
0944     def has_to_build_work(self, req):
0945         try:
0946             if req['status'] in [RequestStatus.New] and 'build_workflow' in req['request_metadata']:
0947                 log_pre = self.get_log_prefix(req)
0948                 self.logger.info(log_pre + "has build work")
0949                 return True
0950                 # workflow = req['request_metadata']['build_workflow']
0951                 # if workflow.has_to_build_work():
0952                 #     log_pre = self.get_log_prefix(req)
0953                 #     self.logger.info(log_pre + "has to_build work")
0954                 #     return True
0955         except Exception as ex:
0956             self.logger.error(ex)
0957             self.logger.error(traceback.format_exc())
0958         return False
0959 
0960     def handle_build_request(self, req):
0961         try:
0962             log_pre = self.get_log_prefix(req)
0963             self.logger.info(log_pre + "handle build request")
0964 
0965             workflow = req['request_metadata']['build_workflow']
0966             works = workflow.get_new_works()
0967             transforms = []
0968             for work in works:
0969                 new_work = work
0970                 new_work.add_proxy(workflow.get_proxy())
0971                 transform = self.generate_transform(req, new_work, build=True)
0972                 transforms.append(transform)
0973             self.logger.debug(log_pre + "Processing request(%s): new build transforms: %s" % (req['request_id'],
0974                                                                                               str(transforms)))
0975 
0976             ret_req = {'request_id': req['request_id'],
0977                        'parameters': {'status': RequestStatus.Building,
0978                                       'locking': RequestLocking.Idle,
0979                                       # 'processing_metadata': processing_metadata,
0980                                       'request_metadata': req['request_metadata']},
0981                        'new_transforms': transforms}
0982             ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'])
0983             self.logger.info(log_pre + "Handle build request result: %s" % str(ret_req))
0984         except Exception as ex:
0985             self.logger.error(ex)
0986             self.logger.error(traceback.format_exc())
0987             retries = req['new_retries'] + 1
0988             if not req['max_new_retries'] or retries < req['max_new_retries']:
0989                 req_status = req['status']
0990             else:
0991                 req_status = RequestStatus.Failed
0992 
0993             # increase poll period
0994             new_poll_period = int(req['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0995             if new_poll_period > self.max_new_poll_period:
0996                 new_poll_period = self.max_new_poll_period
0997 
0998             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0999 
1000             ret_req = {'request_id': req['request_id'],
1001                        'parameters': {'status': req_status,
1002                                       'locking': RequestLocking.Idle,
1003                                       'new_retries': retries,
1004                                       'new_poll_period': new_poll_period,
1005                                       'errors': req['errors'] if req['errors'] else {}}}
1006             ret_req['parameters']['errors'].update(error)
1007             self.logger.warn(log_pre + "Handle build request error result: %s" % str(ret_req))
1008         return ret_req
1009 
1010     def update_request(self, req, origin_req=None):
1011         new_tf_ids, update_tf_ids = [], []
1012         try:
1013             log_pre = self.get_log_prefix(req)
1014             self.logger.info(log_pre + "update request: %s" % req)
1015             req['parameters']['locking'] = RequestLocking.Idle
1016             req['parameters']['updated_at'] = datetime.datetime.utcnow()
1017 
1018             if 'new_transforms' in req:
1019                 new_transforms = req['new_transforms']
1020             else:
1021                 new_transforms = []
1022 
1023             if 'update_transforms' in req:
1024                 update_transforms = req['update_transforms']
1025             else:
1026                 update_transforms = {}
1027 
1028             if 'new_conditions' in req:
1029                 new_conditions = req['new_conditions']
1030             else:
1031                 new_conditions = []
1032 
1033             if origin_req:
1034                 origin_status = origin_req['status']
1035             else:
1036                 origin_status = None
1037 
1038             retry = True
1039             retry_num = 0
1040             while retry:
1041                 retry = False
1042                 retry_num += 1
1043                 try:
1044                     _, new_tf_ids, update_tf_ids = core_requests.update_request_with_transforms(req['request_id'], req['parameters'],
1045                                                                                                 origin_status=origin_status,
1046                                                                                                 new_transforms=new_transforms,
1047                                                                                                 update_transforms=update_transforms,
1048                                                                                                 new_conditions=new_conditions)
1049                 except exceptions.DatabaseException as ex:
1050                     if 'ORA-00060' in str(ex):
1051                         self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
1052                         if retry_num < 5:
1053                             retry = True
1054                             if retry_num <= 1:
1055                                 random_sleep = random.randint(1, 10)
1056                             elif retry_num <= 2:
1057                                 random_sleep = random.randint(1, 60)
1058                             else:
1059                                 random_sleep = random.randint(1, 120)
1060                             time.sleep(random_sleep)
1061                         else:
1062                             raise ex
1063                     else:
1064                         # self.logger.error(ex)
1065                         # self.logger.error(traceback.format_exc())
1066                         raise ex
1067         except Exception as ex:
1068             self.logger.error(ex)
1069             self.logger.error(traceback.format_exc())
1070             try:
1071                 req_parameters = {'status': RequestStatus.Transforming,
1072                                   'locking': RequestLocking.Idle}
1073                 if 'new_retries' in req['parameters']:
1074                     req_parameters['new_retries'] = req['parameters']['new_retries']
1075                 if 'update_retries' in req['parameters']:
1076                     req_parameters['update_retries'] = req['parameters']['update_retries']
1077                 if 'errors' in req['parameters']:
1078                     req_parameters['errors'] = req['parameters']['errors']
1079 
1080                 if origin_req:
1081                     origin_status = origin_req['status']
1082                 else:
1083                     origin_status = None
1084 
1085                 self.logger.warn(log_pre + "Update request in exception: %s" % str(req_parameters))
1086                 core_requests.update_request_with_transforms(req['request_id'], req_parameters, origin_status=origin_status)
1087             except Exception as ex:
1088                 self.logger.error(ex)
1089                 self.logger.error(traceback.format_exc())
1090         return new_tf_ids, update_tf_ids
1091 
1092     def process_new_request(self, event=None, request=None):
1093         self.number_workers += 1
1094         try:
1095             if request is None and event:
1096                 # req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built]
1097                 req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling]
1098                 req = self.get_request(request_id=event._request_id, status=req_status, locking=True)
1099                 if not req:
1100                     self.logger.error("Cannot find request for event: %s" % str(event))
1101                 request = req
1102 
1103             if request:
1104                 req = request
1105                 log_pre = self.get_log_prefix(req)
1106                 self.logger.info(f"{log_pre} process_new_request request: {req['request_id']} event: {event}")
1107                 if self.has_to_build_work(req):
1108                     ret = self.handle_build_request(req)
1109                 elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1110                     ret = self.handle_new_irequest(req)
1111                 elif req['request_type'] in [RequestType.GenericWorkflow]:
1112                     ret = self.handle_new_generic_request(req)
1113                 else:
1114                     ret = self.handle_new_request(req)
1115                 new_tf_ids, update_tf_ids = self.update_request(ret, origin_req=req)
1116                 for tf_id in new_tf_ids:
1117                     # self.logger.info(log_pre + "NewTransformEvent(transform_id: %s)" % str(tf_id))
1118                     # event = NewTransformEvent(publisher_id=self.id, transform_id=tf_id)
1119                     self.logger.info(log_pre + "QueueTransformEvent(transform_id: %s)" % str(tf_id))
1120                     event = QueueTransformEvent(publisher_id=self.id, transform_id=tf_id)
1121                     self.event_bus.send(event)
1122                     # time.sleep(1)
1123                 for tf_id in update_tf_ids:
1124                     self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % str(tf_id))
1125                     event = UpdateTransformEvent(publisher_id=self.id, transform_id=tf_id)
1126                     self.event_bus.send(event)
1127         except Exception as ex:
1128             self.logger.error(ex)
1129             self.logger.error(traceback.format_exc())
1130         self.number_workers -= 1
1131 
1132     def get_workflow_status(self, wf, tf_statuses, has_new_transforms, to_abort):
1133         if has_new_transforms:
1134             return RequestStatus.Transforming
1135         terminated_status = [
1136             TransformStatus.Finished,
1137             TransformStatus.SubFinished,
1138             TransformStatus.Failed,
1139             TransformStatus.Cancelled,
1140             TransformStatus.Suspended,
1141             TransformStatus.Expired,
1142             TransformStatus.Built
1143         ]
1144         finished_status = [TransformStatus.Finished]
1145         failed_status = [
1146             TransformStatus.Failed,
1147             TransformStatus.Cancelled,
1148             TransformStatus.Suspended,
1149             TransformStatus.Expired,
1150         ]
1151 
1152         all_terminated = all(status in terminated_status for status in tf_statuses)
1153         all_finished = all(status in finished_status for status in tf_statuses)
1154         all_failed = all(status in failed_status for status in tf_statuses)
1155 
1156         if all_finished:
1157             return RequestStatus.Finished
1158         elif all_failed:
1159             return RequestStatus.Failed
1160         elif all_terminated:
1161             if to_abort:
1162                 return RequestStatus.Cancelled
1163             return RequestStatus.SubFinished
1164         return RequestStatus.Transforming
1165 
1166     def handle_update_request_real(self, req, event):
1167         """
1168         process running request
1169         """
1170         log_pre = self.get_log_prefix(req)
1171         self.logger.info(log_pre + " handle_update_request: request_id: %s" % req['request_id'])
1172         if 'workflow' in req['request_metadata']:
1173             wf = req['request_metadata']['workflow']
1174         else:
1175             wf = req['request_metadata']['build_workflow']
1176 
1177         to_abort = False
1178         to_abort_transform_id = None
1179         if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
1180             and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):    # noqa W503
1181             to_abort = True
1182             self.logger.info(log_pre + "to_abort: %s" % to_abort)
1183             if (event and event._content and 'cmd_content' in event._content and event._content['cmd_content']
1184                 and 'transform_id' in event._content['cmd_content']):                                    # noqa W503
1185                 to_abort_transform_id = event._content['cmd_content']['transform_id']
1186                 self.logger.info(log_pre + "to_abort_transform_id: %s" % to_abort_transform_id)
1187 
1188         if to_abort and not to_abort_transform_id:
1189             wf.to_cancel = True
1190 
1191         # current works
1192         works = wf.get_all_works()
1193         # print(works)
1194         all_released_work_status = []
1195         for work in works:
1196             # print(work.get_work_id())
1197             found_match_works = False
1198             if work.get_work_id():
1199                 tf = core_transforms.get_transform(transform_id=work.get_work_id(), request_id=req['request_id'])
1200                 if tf:
1201                     all_released_work_status.append(tf['status'])
1202                     transform_work = tf['transform_metadata']['work']
1203                     # work_status = WorkStatus(tf['status'].value)
1204                     # work.set_status(work_status)
1205                     work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1206                     self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1207                     found_match_works = True
1208             if not found_match_works:
1209                 tfs = core_transforms.get_transforms(request_id=req['request_id'], internal_ids=work.internal_id, loop_index=work.get_loop_index())
1210                 if not tfs:
1211                     self.logger.info(f"{log_pre} Found transforms with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tfs}")
1212                 else:
1213                     tf_ids = [tf['transform_id'] for tf in tfs]
1214                     self.logger.info(f"{log_pre} Found transforms with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tf_ids}")
1215                 if len(tfs) == 1:
1216                     tf = tfs[0]
1217                     all_released_work_status.append(tf['status'])
1218                     transform_work = tf['transform_metadata']['work']
1219                     if transform_work.internal_id == work.internal_id:
1220                         if hasattr(work, 'set_work_id'):
1221                             work.set_work_id(tf['transform_id'], transforming=True)
1222                         work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1223                         self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1224                     else:
1225                         all_released_work_status.append(None)
1226                 else:
1227                     all_released_work_status.append(None)
1228 
1229         wf.refresh_works(clean=True)
1230 
1231         new_transforms = []
1232         self.logger.info(log_pre + f"request status: {req['status']} and to_cancel: {wf.to_cancel}")
1233         # if req['status'] in [RequestStatus.Transforming] and not wf.to_cancel:
1234         # To let the last final task to be submitted
1235         # if req['status'] in [RequestStatus.Transforming]:
1236         has_new_transforms = False
1237         if True:
1238             # new works
1239             works = wf.get_new_works()
1240             for work in works:
1241                 tfs = core_transforms.get_transforms(request_id=req['request_id'], internal_ids=work.internal_id, loop_index=work.get_loop_index())
1242                 if not tfs:
1243                     self.logger.info(f"{log_pre} Found transforms for new work with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tfs}")
1244                 else:
1245                     tf_ids = [tf['transform_id'] for tf in tfs]
1246                     self.logger.info(f"{log_pre} Found transforms for new work with request_id {req['request_id']} internal_ids [{work.internal_id}]: {tf_ids}")
1247                 if len(tfs) == 1:
1248                     tf = tfs[0]
1249                     transform_work = tf['transform_metadata']['work']
1250                     if transform_work.internal_id == work.internal_id:
1251                         if hasattr(work, 'set_work_id'):
1252                             work.set_work_id(tf['transform_id'], transforming=True)
1253                         work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1254                         self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1255                 else:
1256                     has_new_transforms = True
1257                     # new_work = work.copy()
1258                     new_work = work
1259                     new_work.add_proxy(wf.get_proxy())
1260                     new_transform = self.generate_transform(req, new_work)
1261                     new_transforms.append(new_transform)
1262             self.logger.debug(log_pre + " Processing request(%s): new transforms: %s" % (req['request_id'], str(new_transforms)))
1263 
1264         req_status = self.get_workflow_status(wf, all_released_work_status, has_new_transforms, to_abort)
1265         """
1266         if wf.is_terminated():
1267             if wf.is_finished(synchronize=False):
1268                 req_status = RequestStatus.Finished
1269             else:
1270                 if to_abort and not to_abort_transform_id:
1271                     req_status = RequestStatus.Cancelled
1272                 elif wf.is_expired(synchronize=False):
1273                     req_status = RequestStatus.Expired
1274                 elif wf.is_subfinished(synchronize=False):
1275                     req_status = RequestStatus.SubFinished
1276                 elif wf.is_failed(synchronize=False):
1277                     req_status = RequestStatus.Failed
1278                 else:
1279                     req_status = RequestStatus.Failed
1280 
1281             # req_msg = wf.get_terminated_msg()
1282         """
1283         if req_status in [RequestStatus.Transforming]:
1284             if wf.is_to_expire(req['expired_at'], self.pending_time, request_id=req['request_id']):
1285                 wf.expired = True
1286                 event_content = {'request_id': req['request_id'],
1287                                  'cmd_type': CommandType.ExpireRequest,
1288                                  'cmd_content': {}}
1289                 self.logger.debug(log_pre + "ExpireRequestEvent(request_id: %s)" % req['request_id'])
1290                 event = ExpireRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event_content)
1291                 self.event_bus.send(event)
1292 
1293         parameters = {'status': req_status,
1294                       'locking': RequestLocking.Idle,
1295                       'request_metadata': req['request_metadata']
1296                       }
1297         parameters = self.load_poll_period(req, parameters)
1298 
1299         ret = {'request_id': req['request_id'],
1300                'parameters': parameters,
1301                'new_transforms': new_transforms}   # 'update_transforms': update_transforms}
1302         self.logger.info(log_pre + "Handle update request result: %s" % str(ret))
1303         return ret
1304 
1305     def handle_update_build_request_real(self, req, event):
1306         """
1307         process build request
1308         """
1309         log_pre = self.get_log_prefix(req)
1310         self.logger.info(log_pre + " handle_update_build_request: request_id: %s" % req['request_id'])
1311         wf = req['request_metadata']['build_workflow']
1312 
1313         to_abort = False
1314         to_abort_transform_id = None
1315         if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
1316             and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):    # noqa W503
1317             to_abort = True
1318             self.logger.info(log_pre + "to_abort: %s" % to_abort)
1319             if (event and event._content and 'cmd_content' in event._content and event._content['cmd_content']
1320                 and 'transform_id' in event._content['cmd_content']):                                    # noqa W503
1321                 to_abort_transform_id = event._content['cmd_content']['transform_id']
1322                 self.logger.info(log_pre + "to_abort_transform_id: %s" % to_abort_transform_id)
1323 
1324         if to_abort and not to_abort_transform_id:
1325             wf.to_cancel = True
1326 
1327         # current works
1328         works = wf.get_all_works()
1329         # print(works)
1330         finished_build_transforms = []
1331         for work in works:
1332             # print(work.get_work_id())
1333             tf = core_transforms.get_transform(transform_id=work.get_work_id())
1334             if tf:
1335                 transform_work = tf['transform_metadata']['work']
1336                 # work_status = WorkStatus(tf['status'].value)
1337                 # work.set_status(work_status)
1338                 work.sync_work_data(status=tf['status'], substatus=tf['substatus'], work=transform_work, workload_id=tf['workload_id'])
1339                 self.logger.info(log_pre + "transform status: %s, work status: %s" % (tf['status'], work.status))
1340                 if tf['status'] in [TransformStatus.Finished]:
1341                     finished_build_transforms.append(tf['transform_id'])
1342         wf.refresh_works()
1343 
1344         new_transforms = []
1345         if req['status'] in [RequestStatus.Building] and not wf.to_cancel:
1346             # new works
1347             works = wf.get_new_works()
1348             for work in works:
1349                 # new_work = work.copy()
1350                 new_work = work
1351                 new_work.add_proxy(wf.get_proxy())
1352                 new_transform = self.generate_transform(req, new_work, build=True)
1353                 new_transforms.append(new_transform)
1354             self.logger.debug(log_pre + " Processing build request(%s): new transforms: %s" % (req['request_id'], str(new_transforms)))
1355 
1356         req_status = RequestStatus.Building
1357         if wf.is_terminated():
1358             if wf.is_finished(synchronize=False):
1359                 if finished_build_transforms:
1360                     req_status = RequestStatus.Built
1361                 else:
1362                     req_status = RequestStatus.Failed
1363             else:
1364                 if to_abort and not to_abort_transform_id:
1365                     req_status = RequestStatus.Cancelled
1366                 elif wf.is_expired(synchronize=False):
1367                     req_status = RequestStatus.Expired
1368                 elif wf.is_subfinished(synchronize=False):
1369                     req_status = RequestStatus.SubFinished
1370                 elif wf.is_failed(synchronize=False):
1371                     req_status = RequestStatus.Failed
1372                 else:
1373                     req_status = RequestStatus.Failed
1374             # req_msg = wf.get_terminated_msg()
1375         else:
1376             if wf.is_to_expire(req['expired_at'], self.pending_time, request_id=req['request_id']):
1377                 wf.expired = True
1378                 event_content = {'request_id': req['request_id'],
1379                                  'cmd_type': CommandType.ExpireRequest,
1380                                  'cmd_content': {}}
1381                 self.logger.debug(log_pre + "ExpireRequestEvent(request_id: %s)" % req['request_id'])
1382                 event = ExpireRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event_content)
1383                 self.event_bus.send(event)
1384 
1385         parameters = {'status': req_status,
1386                       'locking': RequestLocking.Idle,
1387                       'request_metadata': req['request_metadata']
1388                       }
1389         parameters = self.load_poll_period(req, parameters)
1390 
1391         ret = {'request_id': req['request_id'],
1392                'parameters': parameters,
1393                'new_transforms': new_transforms}   # 'update_transforms': update_transforms}
1394         self.logger.info(log_pre + "Handle update request result: %s" % str(ret))
1395         return ret
1396 
1397     def handle_update_request(self, req, event):
1398         """
1399         process running request
1400         """
1401         try:
1402             # if self.release_helper:
1403             #     self.release_inputs(req['request_id'])
1404             if req['status'] in [RequestStatus.Building]:
1405                 ret_req = self.handle_update_build_request_real(req, event=event)
1406             else:
1407                 ret_req = self.handle_update_request_real(req, event)
1408         except Exception as ex:
1409             self.logger.error(ex)
1410             self.logger.error(traceback.format_exc())
1411             retries = req['update_retries'] + 1
1412             if not req['max_update_retries'] or retries < req['max_update_retries']:
1413                 req_status = req['status']
1414             else:
1415                 req_status = RequestStatus.Failed
1416             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1417 
1418             # increase poll period
1419             update_poll_period = int(req['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1420             if update_poll_period > self.max_update_poll_period:
1421                 update_poll_period = self.max_update_poll_period
1422 
1423             ret_req = {'request_id': req['request_id'],
1424                        'parameters': {'status': req_status,
1425                                       'locking': RequestLocking.Idle,
1426                                       'update_retries': retries,
1427                                       'update_poll_period': update_poll_period,
1428                                       'errors': req['errors'] if req['errors'] else {}}}
1429             ret_req['parameters']['errors'].update(error)
1430             log_pre = self.get_log_prefix(req)
1431             self.logger.warn(log_pre + "Handle update request exception result: %s" % str(ret_req))
1432         return ret_req
1433 
1434     def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
1435         if expired_at:
1436             if type(expired_at) in [str]:
1437                 expired_at = str_to_date(expired_at)
1438             if expired_at < datetime.datetime.utcnow():
1439                 self.logger.info("Request(%s) expired_at(%s) is smaller than utc now(%s), expiring" % (request_id,
1440                                                                                                        expired_at,
1441                                                                                                        datetime.datetime.utcnow()))
1442                 return True
1443         return False
1444 
1445     def handle_update_irequest_real(self, req, event):
1446         """
1447         process running request
1448         """
1449         log_pre = self.get_log_prefix(req)
1450         self.logger.info(log_pre + " handle_update_irequest: request_id: %s" % req['request_id'])
1451 
1452         tfs = core_transforms.get_transforms(request_id=req['request_id'])
1453         total_tfs, finished_tfs, subfinished_tfs, failed_tfs = 0, 0, 0, 0
1454         for tf in tfs:
1455             total_tfs += 1
1456             if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]:
1457                 finished_tfs += 1
1458             elif tf['status'] in [TransformStatus.SubFinished]:
1459                 subfinished_tfs += 1
1460             elif tf['status'] in [TransformStatus.Failed, TransformStatus.Cancelled,
1461                                   TransformStatus.Suspended, TransformStatus.Expired]:
1462                 failed_tfs += 1
1463 
1464         req_status = RequestStatus.Transforming
1465         if req['request_type'] in [RequestType.iWorkflowLocal]:
1466             workflow = req['request_metadata'].get('workflow', None)
1467             if workflow and req['created_at'] + datetime.timedelta(seconds=workflow.max_walltime) < datetime.datetime.utcnow():
1468                 req_status = RequestStatus.Finished
1469         else:
1470             if total_tfs == finished_tfs:
1471                 req_status = RequestStatus.Finished
1472             elif total_tfs == finished_tfs + subfinished_tfs + failed_tfs:
1473                 if finished_tfs + subfinished_tfs > 0:
1474                     req_status = RequestStatus.SubFinished
1475                 else:
1476                     req_status = RequestStatus.Failed
1477 
1478         log_msg = log_pre + "ireqeust %s status: %s" % (req['request_id'], req_status)
1479         log_msg = log_msg + "(transforms: total %s, finished: %s, subfinished: %s, failed %s)" % (total_tfs, finished_tfs, subfinished_tfs, failed_tfs)
1480         self.logger.debug(log_msg)
1481 
1482         if req_status not in [RequestStatus.Finished, RequestStatus.SubFinished, RequestStatus.Failed]:
1483             if self.is_to_expire(req['expired_at'], self.pending_time, request_id=req['request_id']):
1484                 event_content = {'request_id': req['request_id'],
1485                                  'cmd_type': CommandType.ExpireRequest,
1486                                  'cmd_content': {}}
1487                 self.logger.debug(log_pre + "ExpireRequestEvent(request_id: %s)" % req['request_id'])
1488                 event = ExpireRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event_content)
1489                 self.event_bus.send(event)
1490 
1491         parameters = {'status': req_status,
1492                       'locking': RequestLocking.Idle,
1493                       'request_metadata': req['request_metadata']
1494                       }
1495         parameters = self.load_poll_period(req, parameters)
1496 
1497         ret = {'request_id': req['request_id'],
1498                'parameters': parameters}
1499         self.logger.info(log_pre + "Handle update irequest result: %s" % str(ret))
1500         return ret
1501 
1502     def handle_update_irequest(self, req, event):
1503         """
1504         process running irequest
1505         """
1506         try:
1507             ret_req = self.handle_update_irequest_real(req, event)
1508         except Exception as ex:
1509             self.logger.error(ex)
1510             self.logger.error(traceback.format_exc())
1511             retries = req['update_retries'] + 1
1512             if not req['max_update_retries'] or retries < req['max_update_retries']:
1513                 req_status = req['status']
1514             else:
1515                 req_status = RequestStatus.Failed
1516             error = {'update_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1517 
1518             # increase poll period
1519             update_poll_period = int(req['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1520             if update_poll_period > self.max_update_poll_period:
1521                 update_poll_period = self.max_update_poll_period
1522 
1523             ret_req = {'request_id': req['request_id'],
1524                        'parameters': {'status': req_status,
1525                                       'locking': RequestLocking.Idle,
1526                                       'update_retries': retries,
1527                                       'update_poll_period': update_poll_period,
1528                                       'errors': req['errors'] if req['errors'] else {}}}
1529             ret_req['parameters']['errors'].update(error)
1530             log_pre = self.get_log_prefix(req)
1531             self.logger.warn(log_pre + "Handle update irequest exception result: %s" % str(ret_req))
1532         return ret_req
1533 
1534     def process_update_request(self, event=None, request=None):
1535         self.number_workers += 1
1536         pro_ret = ReturnCode.Ok.value
1537         try:
1538             if request is None and event:
1539                 # req_status = [RequestStatus.Transforming, RequestStatus.ToCancel, RequestStatus.Cancelling,
1540                 #               RequestStatus.ToSuspend, RequestStatus.Suspending,
1541                 #               RequestStatus.ToExpire, RequestStatus.Expiring,
1542                 #               RequestStatus.ToFinish, RequestStatus.ToForceFinish,
1543                 #               RequestStatus.ToResume, RequestStatus.Resuming,
1544                 #               RequestStatus.Building]
1545 
1546                 # req = self.get_request(request_id=event._request_id, status=req_status, locking=True)
1547                 self.logger.debug("process_update_request: event: %s" % str(event))
1548                 req = self.get_request(request_id=event._request_id, locking=True)
1549                 if not req:
1550                     self.logger.error("Cannot find request for event: %s" % str(event))
1551                     # pro_ret = ReturnCode.Locked.value
1552                     pro_ret = ReturnCode.Ok.value
1553                 else:
1554                     request = req
1555 
1556             if request:
1557                 req = request
1558                 log_pre = self.get_log_prefix(req)
1559                 self.logger.info(f"{log_pre} process_update_request request: {req['request_id']} event: {event}")
1560 
1561                 if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1562                     ret = self.handle_update_irequest(req, event=event)
1563                 else:
1564                     ret = self.handle_update_request(req, event=event)
1565                 new_tf_ids, update_tf_ids = self.update_request(ret, origin_req=req)
1566                 for tf_id in new_tf_ids:
1567                     # self.logger.info(log_pre + "NewTransformEvent(transform_id: %s)" % tf_id)
1568                     # event = NewTransformEvent(publisher_id=self.id, transform_id=tf_id, content=event._content)
1569                     self.logger.info(log_pre + "QueueTransformEvent(transform_id: %s)" % str(tf_id))
1570                     event = QueueTransformEvent(publisher_id=self.id, transform_id=tf_id)
1571                     self.event_bus.send(event)
1572                 for tf_id in update_tf_ids:
1573                     self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % tf_id)
1574                     event = UpdateTransformEvent(publisher_id=self.id, transform_id=tf_id, content=event._content if event else None)
1575                     self.event_bus.send(event)
1576         except Exception as ex:
1577             self.logger.error(ex)
1578             self.logger.error(traceback.format_exc())
1579             pro_ret = ReturnCode.Failed.value
1580         self.number_workers -= 1
1581         return pro_ret
1582 
1583     def handle_abort_request(self, req, event):
1584         """
1585         process abort request
1586         """
1587         try:
1588             log_pre = self.get_log_prefix(req)
1589             self.logger.info(log_pre + "handle_abort_request event: %s" % str(event))
1590 
1591             to_abort = False
1592             to_abort_transform_id = None
1593             if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
1594                 and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):    # noqa W503
1595                 to_abort = True
1596                 self.logger.info(log_pre + "to_abort: %s" % to_abort)
1597                 if (event and event._content and 'cmd_content' in event._content and event._content['cmd_content']
1598                     and 'transform_id' in event._content['cmd_content']):                                    # noqa W503
1599                     to_abort_transform_id = event._content['cmd_content']['transform_id']
1600                     self.logger.info(log_pre + "to_abort_transform_id: %s" % to_abort_transform_id)
1601 
1602             if to_abort and to_abort_transform_id:
1603                 req_status = req['status']
1604             else:
1605                 if req['status'] in [RequestStatus.Building]:
1606                     wf = req['request_metadata']['build_workflow']
1607                 else:
1608                     if 'workflow' in req['request_metadata']:
1609                         wf = req['request_metadata']['workflow']
1610                     else:
1611                         wf = req['request_metadata']['build_workflow']
1612                 wf.to_cancel = True
1613                 req_status = RequestStatus.Cancelling
1614 
1615             ret_req = {'request_id': req['request_id'],
1616                        'parameters': {'status': req_status,
1617                                       'substatus': RequestStatus.ToCancel,
1618                                       'locking': RequestLocking.Idle,
1619                                       'request_metadata': req['request_metadata']},
1620                        }
1621             self.logger.info(log_pre + "handle_abort_request result: %s" % str(ret_req))
1622             return ret_req
1623         except Exception as ex:
1624             self.logger.error(ex)
1625             self.logger.error(traceback.format_exc())
1626             error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1627             ret_req = {'request_id': req['request_id'],
1628                        'parameters': {'status': RequestStatus.ToCancel,
1629                                       'locking': RequestLocking.Idle,
1630                                       'errors': req['errors'] if req['errors'] else {}}}
1631             ret_req['parameters']['errors'].update(error)
1632             self.logger.info(log_pre + "handle_abort_request exception result: %s" % str(ret_req))
1633         return ret_req
1634 
1635     def handle_command(self, event, cmd_status=None, command=None, errors=None):
1636         if (event and event._content and 'cmd_id' in event._content and event._content['cmd_id']):
1637             u_command = {'cmd_id': event._content['cmd_id'],
1638                          'status': cmd_status,
1639                          'locking': CommandLocking.Idle}
1640             if errors:
1641                 u_command['errors'] = errors
1642             core_commands.update_commands([u_command])
1643         if command:
1644             u_command = {'cmd_id': command,
1645                          'status': cmd_status,
1646                          'locking': CommandLocking.Idle}
1647             if errors:
1648                 u_command['errors'] = errors
1649             core_commands.update_commands([u_command])
1650 
1651     def process_abort_request(self, event=None, request=None, command=None):
1652         self.number_workers += 1
1653         pro_ret = ReturnCode.Ok.value
1654         try:
1655             if request is None and event:
1656                 req = self.get_request(request_id=event._request_id, locking=True)
1657                 if not req:
1658                     self.logger.warn("Cannot find request for event: %s" % str(event))
1659                     pro_ret = ReturnCode.Locked.value
1660                 else:
1661                     request = req
1662             if request:
1663                 req = request
1664                 log_pre = self.get_log_prefix(req)
1665                 self.logger.info(log_pre + f"process_abort_request request: {req['request_id']}, event: {event}, command: {command}")
1666 
1667                 if req['status'] in [RequestStatus.Finished, RequestStatus.SubFinished,
1668                                      RequestStatus.Failed, RequestStatus.Cancelled,
1669                                      RequestStatus.Suspended, RequestStatus.Expired]:
1670                     ret = {'request_id': req['request_id'],
1671                            'parameters': {'locking': RequestLocking.Idle,
1672                                           'command': CommandType.AbortRequest,
1673                                           'errors': {'extra_msg': "Request is already terminated. Cannot be aborted"}}}
1674                     if req['errors'] and 'msg' in req['errors']:
1675                         ret['parameters']['errors']['msg'] = req['errors']['msg']
1676                     self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1677                     self.update_request(ret, origin_req=req)
1678                     self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors="Request is already terminated. Cannot be aborted")
1679                 elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1680                     ret = self.handle_close_irequest(req, event=event)
1681                     self.update_request(ret, origin_req=req)
1682 
1683                     # self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support abortion for iWorkflow")
1684                     self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1685                 else:
1686                     ret = self.handle_abort_request(req, event)
1687                     self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1688                     self.update_request(ret, origin_req=req)
1689 
1690                     core_transforms.abort_resume_transforms(request_id=req['request_id'], abort=True)
1691                     core_processings.abort_resume_processings(request_id=req['request_id'], abort=True)
1692 
1693                     to_abort_transform_id = None
1694                     if event and event._content and event._content['cmd_content'] and 'transform_id' in event._content['cmd_content']:
1695                         to_abort_transform_id = event._content['cmd_content']['transform_id']
1696 
1697                     if req['status'] in [RequestStatus.Building]:
1698                         wf = req['request_metadata']['build_workflow']
1699                     else:
1700                         if 'workflow' in req['request_metadata']:
1701                             wf = req['request_metadata']['workflow']
1702                         else:
1703                             wf = req['request_metadata']['build_workflow']
1704                     works = wf.get_all_works()
1705                     if works:
1706                         has_abort_work = False
1707                         for work in works:
1708                             if (work.is_started() or work.is_starting()) and not work.is_terminated():
1709                                 if not to_abort_transform_id or to_abort_transform_id == work.get_work_id():
1710                                     self.logger.info(log_pre + "AbortTransformEvent(transform_id: %s)" % str(work.get_work_id()))
1711                                     event = AbortTransformEvent(publisher_id=self.id,
1712                                                                 transform_id=work.get_work_id(),
1713                                                                 content=event._content if event else None)
1714                                     self.event_bus.send(event)
1715                                     has_abort_work = True
1716                         if not has_abort_work:
1717                             self.logger.info(log_pre + "not has abort work")
1718                             self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1719                             event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1720                             self.event_bus.send(event)
1721                     else:
1722                         # no works. should trigger update request
1723                         self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1724                         event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1725                         self.event_bus.send(event)
1726 
1727                     self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1728         except AssertionError as ex:
1729             self.logger.error("process_abort_request, Failed to process event: %s" % str(event))
1730             self.logger.error(ex)
1731             self.logger.error(traceback.format_exc())
1732             self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=str(ex))
1733             pro_ret = ReturnCode.Failed.value
1734         except Exception as ex:
1735             self.logger.error(ex)
1736             self.logger.error(traceback.format_exc())
1737             pro_ret = ReturnCode.Failed.value
1738         self.number_workers -= 1
1739         return pro_ret
1740 
1741     def handle_close_irequest(self, req, event):
1742         """
1743         process close irequest
1744         """
1745         try:
1746             log_pre = self.get_log_prefix(req)
1747             self.logger.info(log_pre + "handle_close_irequest event: %s" % str(event))
1748 
1749             tfs = core_transforms.get_transforms(request_id=req['request_id'])
1750             total_tfs, finished_tfs, subfinished_tfs, failed_tfs = 0, 0, 0, 0
1751             for tf in tfs:
1752                 total_tfs += 1
1753                 if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]:
1754                     finished_tfs += 1
1755                 elif tf['status'] in [TransformStatus.SubFinished]:
1756                     subfinished_tfs += 1
1757                 elif tf['status'] in [TransformStatus.Failed, TransformStatus.Cancelled,
1758                                       TransformStatus.Suspended, TransformStatus.Expired]:
1759                     failed_tfs += 1
1760                 else:
1761                     event = AbortTransformEvent(publisher_id=self.id,
1762                                                 transform_id=tf['transform_id'],
1763                                                 content=event._content if event else None)
1764                     self.event_bus.send(event)
1765 
1766             req_status = RequestStatus.Transforming
1767             if req['request_type'] in [RequestType.iWorkflowLocal] and total_tfs == 0:
1768                 req_status = RequestStatus.Finished
1769             else:
1770                 if total_tfs == finished_tfs:
1771                     req_status = RequestStatus.Finished
1772                 elif total_tfs == finished_tfs + subfinished_tfs + failed_tfs:
1773                     if finished_tfs + subfinished_tfs > 0:
1774                         req_status = RequestStatus.SubFinished
1775                     else:
1776                         req_status = RequestStatus.Failed
1777 
1778             log_msg = log_pre + "ireqeust %s status: %s" % (req['request_id'], req_status)
1779             log_msg = log_msg + "(transforms: total %s, finished: %s, subfinished: %s, failed %s)" % (total_tfs, finished_tfs, subfinished_tfs, failed_tfs)
1780             self.logger.debug(log_msg)
1781 
1782             parameters = {'status': req_status,
1783                           'substatus': RequestStatus.ToClose,
1784                           'locking': RequestLocking.Idle,
1785                           'request_metadata': req['request_metadata']
1786                           }
1787             parameters = self.load_poll_period(req, parameters)
1788 
1789             ret = {'request_id': req['request_id'],
1790                    'parameters': parameters}
1791             self.logger.info(log_pre + "Handle close irequest result: %s" % str(ret))
1792             return ret
1793 
1794         except Exception as ex:
1795             self.logger.error(ex)
1796             self.logger.error(traceback.format_exc())
1797             error = {'close_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1798             ret_req = {'request_id': req['request_id'],
1799                        'parameters': {'status': RequestStatus.ToClose,
1800                                       'locking': RequestLocking.Idle,
1801                                       'errors': req['errors'] if req['errors'] else {}}}
1802             ret_req['parameters']['errors'].update(error)
1803             self.logger.info(log_pre + "handle_close_irequest exception result: %s" % str(ret_req))
1804         return ret_req
1805 
1806     def process_close_request(self, event=None, request=None, command=None):
1807         self.number_workers += 1
1808         pro_ret = ReturnCode.Ok.value
1809         try:
1810             if request is None and event:
1811                 req = self.get_request(request_id=event._request_id, locking=True)
1812                 if not req:
1813                     self.logger.warn("Cannot find request for event: %s" % str(event))
1814                     pro_ret = ReturnCode.Locked.value
1815                 else:
1816                     request = req
1817             if request:
1818                 req = request
1819                 log_pre = self.get_log_prefix(req)
1820                 self.logger.info(log_pre + f"process_close_request request: {req['request_id']} event: {event}")
1821 
1822                 if req['status'] in [RequestStatus.Finished, RequestStatus.SubFinished,
1823                                      RequestStatus.Failed, RequestStatus.Cancelled,
1824                                      RequestStatus.Suspended, RequestStatus.Expired]:
1825                     ret = {'request_id': req['request_id'],
1826                            'parameters': {'locking': RequestLocking.Idle,
1827                                           'command': CommandType.CloseRequest,
1828                                           'errors': {'extra_msg': "Request is already terminated. Cannot be closed"}}}
1829                     if req['errors'] and 'msg' in req['errors']:
1830                         ret['parameters']['errors']['msg'] = req['errors']['msg']
1831                     self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1832                     self.update_request(ret, origin_req=req)
1833                     self.handle_command(event, command=command, cmd_status=CommandStatus.Failed, errors="Request is already terminated. Cannot be closed")
1834                 else:
1835                     if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1836                         ret = self.handle_close_irequest(req, event=event)
1837                         self.update_request(ret, origin_req=req)
1838                     else:
1839                         pass
1840                         ret = self.handle_abort_request(req, event)
1841                         self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
1842                         self.update_request(ret, origin_req=req)
1843                         to_abort_transform_id = None
1844                         if event and event._content and event._content['cmd_content'] and 'transform_id' in event._content['cmd_content']:
1845                             to_abort_transform_id = event._content['cmd_content']['transform_id']
1846 
1847                         if req['status'] in [RequestStatus.Building]:
1848                             wf = req['request_metadata']['build_workflow']
1849                         else:
1850                             if 'workflow' in req['request_metadata']:
1851                                 wf = req['request_metadata']['workflow']
1852                             else:
1853                                 wf = req['request_metadata']['build_workflow']
1854                         works = wf.get_all_works()
1855                         if works:
1856                             has_abort_work = False
1857                             for work in works:
1858                                 if (work.is_started() or work.is_starting()) and not work.is_terminated():
1859                                     if not to_abort_transform_id or to_abort_transform_id == work.get_work_id():
1860                                         self.logger.info(log_pre + "AbortTransformEvent(transform_id: %s)" % str(work.get_work_id()))
1861                                         event = AbortTransformEvent(publisher_id=self.id,
1862                                                                     transform_id=work.get_work_id(),
1863                                                                     content=event._content if event else None)
1864                                         self.event_bus.send(event)
1865                                         has_abort_work = True
1866                             if not has_abort_work:
1867                                 self.logger.info(log_pre + "not has abort work")
1868                                 self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1869                                 event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1870                                 self.event_bus.send(event)
1871                         else:
1872                             # no works. should trigger update request
1873                             self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
1874                             event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
1875                             self.event_bus.send(event)
1876 
1877                     self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1878         except AssertionError as ex:
1879             self.logger.error("process_close_request, Failed to process event: %s" % str(event))
1880             self.logger.error(ex)
1881             self.logger.error(traceback.format_exc())
1882             self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=str(ex))
1883             pro_ret = ReturnCode.Failed.value
1884         except Exception as ex:
1885             self.logger.error(ex)
1886             self.logger.error(traceback.format_exc())
1887             pro_ret = ReturnCode.Failed.value
1888         self.number_workers -= 1
1889         return pro_ret
1890 
1891     def handle_resume_irequest(self, req, event):
1892         """
1893         process resume irequest
1894         """
1895         try:
1896             log_pre = self.get_log_prefix(req)
1897             self.logger.info(log_pre + "handle_resume_irequest event: %s" % str(event))
1898 
1899             tfs = core_transforms.get_transforms(request_id=req['request_id'])
1900             for tf in tfs:
1901                 if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]:
1902                     continue
1903                 else:
1904                     event = ResumeTransformEvent(publisher_id=self.id,
1905                                                  transform_id=tf['transform_id'],
1906                                                  content=event._content if event else None)
1907                     self.event_bus.send(event)
1908 
1909             parameters = {'status': RequestStatus.Transforming,
1910                           'substatus': RequestStatus.ToResume,
1911                           'locking': RequestLocking.Idle,
1912                           'request_metadata': req['request_metadata']
1913                           }
1914             parameters = self.load_poll_period(req, parameters)
1915 
1916             ret = {'request_id': req['request_id'],
1917                    'parameters': parameters}
1918             self.logger.info(log_pre + "Handle resume irequest result: %s" % str(ret))
1919             return ret
1920         except Exception as ex:
1921             self.logger.error(ex)
1922             self.logger.error(traceback.format_exc())
1923             error = {'close_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1924             ret_req = {'request_id': req['request_id'],
1925                        'parameters': {'status': RequestStatus.ToClose,
1926                                       'locking': RequestLocking.Idle,
1927                                       'errors': req['errors'] if req['errors'] else {}}}
1928             ret_req['parameters']['errors'].update(error)
1929             self.logger.info(log_pre + "handle_close_irequest exception result: %s" % str(ret_req))
1930         return ret_req
1931 
1932     def handle_resume_request(self, req):
1933         """
1934         process resume request
1935         """
1936         try:
1937             req_status = RequestStatus.Resuming
1938 
1939             processing_metadata = req['processing_metadata']
1940 
1941             if 'workflow' in req['request_metadata'] and req['request_metadata']['workflow'] is not None:
1942                 wf = req['request_metadata']['workflow']
1943                 wf.resume_works()
1944             elif 'build_workflow' in req['request_metadata'] and req['request_metadata']['build_workflow'] is not None:
1945                 req_status = RequestStatus.Building
1946             else:
1947                 req_status = RequestStatus.Failed
1948 
1949             ret_req = {'request_id': req['request_id'],
1950                        'parameters': {'status': req_status,
1951                                       'request_metadata': req['request_metadata'],
1952                                       'processing_metadata': processing_metadata,
1953                                       'locking': RequestLocking.Idle},
1954                        }
1955             return ret_req
1956         except Exception as ex:
1957             self.logger.error(ex)
1958             self.logger.error(traceback.format_exc())
1959             error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1960             ret_req = {'request_id': req['request_id'],
1961                        'parameters': {'status': RequestStatus.ToResume,
1962                                       'locking': RequestLocking.Idle,
1963                                       'errors': req['errors'] if req['errors'] else {}}}
1964             ret_req['parameters']['errors'].update(error)
1965         return ret_req
1966 
1967     def process_resume_request(self, event=None, request=None, command=None):
1968         self.number_workers += 1
1969         pro_ret = ReturnCode.Ok.value
1970         try:
1971             if request is None and event:
1972                 req = self.get_request(request_id=event._request_id, locking=True)
1973                 if not req:
1974                     self.logger.error("Cannot find request for event: %s" % str(event))
1975                     pro_ret = ReturnCode.Locked.value
1976                 else:
1977                     request = req
1978             if request:
1979                 req = request
1980                 log_pre = self.get_log_prefix(req)
1981                 self.logger.info(log_pre + f"process_resume_request request: {req['request_id']} event: {event}")
1982 
1983                 if req['status'] in [RequestStatus.Finished]:
1984                     ret = {'request_id': req['request_id'],
1985                            'parameters': {'locking': RequestLocking.Idle,
1986                                           'command': CommandType.ResumeRequest,
1987                                           'errors': {'extra_msg': "Request is already finished. Cannot be resumed"}}}
1988                     if req['errors'] and 'msg' in req['errors']:
1989                         ret['parameters']['errors']['msg'] = req['errors']['msg']
1990                     self.logger.info(log_pre + "process_resume_request result: %s" % str(ret))
1991 
1992                     self.update_request(ret, origin_req=req)
1993                     self.handle_command(event, command=command, cmd_status=CommandStatus.Failed, errors="Request is already finished. Cannot be resumed")
1994                 elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
1995                     ret = self.handle_resume_irequest(req)
1996                     self.update_request(ret, origin_req=req)
1997                     # self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support to reusme for iWorkflow")
1998                     self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
1999                 else:
2000                     ret = self.handle_resume_request(req)
2001                     self.logger.info(log_pre + "process_resume_request result: %s" % str(ret))
2002 
2003                     self.update_request(ret, origin_req=req)
2004                     if 'workflow' in req['request_metadata']:
2005                         wf = req['request_metadata']['workflow']
2006                         works = wf.get_all_works()
2007                         if works:
2008                             for work in works:
2009                                 # if not work.is_finished():
2010                                 self.logger.info(log_pre + "ResumeTransformEvent(transform_id: %s)" % str(work.get_work_id()))
2011                                 event = ResumeTransformEvent(publisher_id=self.id,
2012                                                              transform_id=work.get_work_id(),
2013                                                              content=event._content if event else None)
2014                                 self.event_bus.send(event)
2015                         else:
2016                             self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id']))
2017                             event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content if event else None)
2018                             self.event_bus.send(event)
2019 
2020                     core_transforms.abort_resume_transforms(request_id=req['request_id'], resume=True)
2021                     core_processings.abort_resume_processings(request_id=req['request_id'], resume=True)
2022 
2023                     self.handle_command(event, command=command, cmd_status=CommandStatus.Processed, errors=None)
2024         except Exception as ex:
2025             self.logger.error(ex)
2026             self.logger.error(traceback.format_exc())
2027             pro_ret = ReturnCode.Failed.value
2028         self.number_workers -= 1
2029         return pro_ret
2030 
2031     def clean_locks(self, force=False):
2032         try:
2033             self.logger.info(f"clean locking: force: {force}")
2034             health_items = self.get_health_items()
2035             min_request_id = BaseAgent.min_request_id
2036             hostname, pid, thread_id, thread_name = self.get_process_thread_info()
2037             core_requests.clean_locking(health_items=health_items, min_request_id=min_request_id,
2038                                         time_period=self.clean_locks_time_period,
2039                                         force=force, hostname=hostname, pid=pid)
2040         except Exception as ex:
2041             self.logger.info(f"Failed clean locking: {ex}")
2042 
2043     def init_event_function_map(self):
2044         self.event_func_map = {
2045             EventType.NewRequest: {
2046                 'pre_check': self.is_ok_to_run_more_requests,
2047                 'exec_func': self.process_new_request
2048             },
2049             EventType.UpdateRequest: {
2050                 'pre_check': self.is_ok_to_run_more_requests,
2051                 'exec_func': self.process_update_request
2052             },
2053             EventType.AbortRequest: {
2054                 'pre_check': self.is_ok_to_run_more_requests,
2055                 'exec_func': self.process_abort_request
2056             },
2057             EventType.ExpireRequest: {
2058                 'pre_check': self.is_ok_to_run_more_requests,
2059                 'exec_func': self.process_abort_request
2060             },
2061             EventType.ResumeRequest: {
2062                 'pre_check': self.is_ok_to_run_more_requests,
2063                 'exec_func': self.process_resume_request
2064             },
2065             EventType.CloseRequest: {
2066                 'pre_check': self.is_ok_to_run_more_requests,
2067                 'exec_func': self.process_close_request
2068             }
2069         }
2070 
2071     def run(self):
2072         """
2073         Main run function.
2074         """
2075         try:
2076             self.logger.info("Starting main thread")
2077             self.init_thread_info()
2078 
2079             self.load_plugins()
2080 
2081             self.add_default_tasks()
2082             self.clean_locks(force=True)
2083 
2084             self.init_event_function_map()
2085 
2086             task = self.create_task(task_func=self.get_new_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
2087             self.add_task(task)
2088             task = self.create_task(task_func=self.get_running_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
2089             self.add_task(task)
2090             task = self.create_task(task_func=self.get_operation_requests, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
2091             self.add_task(task)
2092             task = self.create_task(task_func=self.clean_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=3600, priority=1)
2093             self.add_task(task)
2094             task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=60, priority=1)
2095             self.add_task(task)
2096 
2097             self.execute()
2098         except KeyboardInterrupt:
2099             self.stop()
2100 
2101 
2102 if __name__ == '__main__':
2103     agent = Clerk()
2104     agent()