Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 import copy
0012 import datetime
0013 import random
0014 import time
0015 import traceback
0016 
0017 from idds.common import exceptions
0018 from idds.common.constants import (Sections, ReturnCode, TransformType,
0019                                    TransformStatus, TransformLocking,
0020                                    CollectionType, CollectionStatus,
0021                                    CollectionRelationType,
0022                                    ContentStatus, ContentRelationType,
0023                                    CommandType, ProcessingStatus, WorkflowType,
0024                                    ConditionStatus,
0025                                    get_processing_type_from_transform_type,
0026                                    get_transform_status_from_processing_status)
0027 from idds.common.utils import setup_logging, truncate_string
0028 from idds.core import (transforms as core_transforms,
0029                        processings as core_processings,
0030                        catalog as core_catalog,
0031                        throttlers as core_throttlers,
0032                        conditions as core_conditions)
0033 from idds.agents.common.baseagent import BaseAgent
0034 from idds.agents.common.eventbus.event import (EventType,
0035                                                NewTransformEvent,
0036                                                UpdateTransformEvent,
0037                                                AbortProcessingEvent,
0038                                                ResumeProcessingEvent,
0039                                                UpdateRequestEvent,
0040                                                NewProcessingEvent,
0041                                                UpdateProcessingEvent)
0042 
0043 from idds.agents.common.cache.redis import get_redis_cache
0044 
0045 setup_logging(__name__)
0046 
0047 
0048 class Transformer(BaseAgent):
0049     """
0050     Transformer works to process transforms.
0051     """
0052 
0053     def __init__(self, num_threads=1, max_number_workers=8, poll_period=1800, retries=3, retrieve_bulk_size=3,
0054                  use_process_pool=False, message_bulk_size=10000, **kwargs):
0055         self.max_number_workers = max_number_workers
0056         self.set_max_workers()
0057         num_threads = self.max_number_workers
0058         super(Transformer, self).__init__(num_threads=num_threads, name='Transformer', use_process_pool=use_process_pool, **kwargs)
0059         self.config_section = Sections.Transformer
0060         self.poll_period = int(poll_period)
0061         self.retries = int(retries)
0062         self.retrieve_bulk_size = int(retrieve_bulk_size)
0063         self.message_bulk_size = int(message_bulk_size)
0064 
0065         if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0066             self.new_poll_period = self.poll_period
0067         else:
0068             self.new_poll_period = int(self.new_poll_period)
0069         if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0070             self.update_poll_period = self.poll_period
0071         else:
0072             self.update_poll_period = int(self.update_poll_period)
0073 
0074         if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0075             self.new_poll_period = self.poll_period
0076         else:
0077             self.new_poll_period = int(self.new_poll_period)
0078         if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0079             self.update_poll_period = self.poll_period
0080         else:
0081             self.update_poll_period = int(self.update_poll_period)
0082 
0083         if hasattr(self, 'poll_period_increase_rate'):
0084             self.poll_period_increase_rate = float(self.poll_period_increase_rate)
0085         else:
0086             self.poll_period_increase_rate = 2
0087 
0088         if hasattr(self, 'max_new_poll_period'):
0089             self.max_new_poll_period = int(self.max_new_poll_period)
0090         else:
0091             self.max_new_poll_period = 3600 * 6
0092         if hasattr(self, 'max_update_poll_period'):
0093             self.max_update_poll_period = int(self.max_update_poll_period)
0094         else:
0095             self.max_update_poll_period = 3600 * 6
0096 
0097         self.number_workers = 0
0098         if not hasattr(self, 'max_number_workers') or not self.max_number_workers:
0099             self.max_number_workers = 3
0100         else:
0101             self.max_number_workers = int(self.max_number_workers)
0102 
0103         self.show_queue_size_time = None
0104 
0105         if hasattr(self, 'cache_expire_seconds'):
0106             self.cache_expire_seconds = int(self.cache_expire_seconds)
0107         else:
0108             self.cache_expire_seconds = 300
0109 
0110         if hasattr(self, 'clean_locks_time_period'):
0111             self.clean_locks_time_period = int(self.clean_locks_time_period)
0112         else:
0113             self.clean_locks_time_period = 1800
0114 
0115     def is_ok_to_run_more_transforms(self):
0116         if self.get_num_free_workers() > 0:
0117             return True
0118         return False
0119 
0120     def show_queue_size(self):
0121         if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0122             self.show_queue_size_time = time.time()
0123             q_str = "number of transforms: %s, max number of transforms: %s" % (self.get_num_workers(), self.get_max_workers())
0124             self.logger.debug(q_str)
0125 
0126     def get_bulk_size(self):
0127         return min(self.retrieve_bulk_size, self.get_num_free_workers())
0128 
0129     def get_throttlers(self):
0130         """
0131         Use throttler
0132         """
0133         cache = get_redis_cache()
0134         throttlers = cache.get("throttlers", default=None)
0135         if throttlers is None:
0136             throttler_items = core_throttlers.get_throttlers()
0137             throttlers = {}
0138             for item in throttler_items:
0139                 throttlers[item['site']] = {'num_requests': item['num_requests'],
0140                                             'num_transforms': item['num_transforms'],
0141                                             'num_processings': item['num_processings'],
0142                                             'new_contents': item['new_contents'],
0143                                             'queue_contents': item['queue_contents'],
0144                                             'others': item['others'],
0145                                             'status': item['status']}
0146             cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds)
0147         return throttlers
0148 
0149     def get_num_active_transforms(self, site_name):
0150         cache = get_redis_cache()
0151         num_transforms = cache.get("num_transforms", default=None)
0152         if num_transforms is None:
0153             num_transforms = {}
0154             active_status = [TransformStatus.New, TransformStatus.Ready]
0155             active_status1 = [TransformStatus.Transforming, TransformStatus.Terminating]
0156             rets = core_transforms.get_num_active_transforms(active_status + active_status1)
0157             for ret in rets:
0158                 status, site, count = ret
0159                 if site is None:
0160                     site = 'Default'
0161                 if site not in num_transforms:
0162                     num_transforms[site] = {'new': 0, 'processing': 0}
0163                 if status in active_status:
0164                     num_transforms[site]['new'] += count
0165                 elif status in active_status1:
0166                     num_transforms[site]['processing'] += count
0167             cache.set("num_transforms", num_transforms, expire_seconds=self.cache_expire_seconds)
0168         default_value = {'new': 0, 'processing': 0}
0169         return num_transforms.get(site_name, default_value)
0170 
0171     def get_num_active_processings(self, site_name):
0172         cache = get_redis_cache()
0173         num_processings = cache.get("num_processings", default=None)
0174         active_transforms = cache.get("active_transforms", default={})
0175         if num_processings is None:
0176             num_processings = {}
0177             active_transforms = {}
0178             active_status = [ProcessingStatus.New]
0179             active_status1 = [ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0180                               ProcessingStatus.Running, ProcessingStatus.Terminating, ProcessingStatus.ToTrigger,
0181                               ProcessingStatus.Triggering]
0182             rets = core_processings.get_active_processings(active_status + active_status1)
0183             for ret in rets:
0184                 req_id, trf_id, pr_id, site, status = ret
0185                 if site is None:
0186                     site = 'Default'
0187                 if site not in num_processings:
0188                     num_processings[site] = {'new': 0, 'processing': 0}
0189                     active_transforms[site] = []
0190                 if status in active_status:
0191                     num_processings[site]['new'] += 1
0192                 elif status in active_status1:
0193                     num_processings[site]['processing'] += 1
0194                 active_transforms[site].append(trf_id)
0195             cache.set("num_processings", num_processings, expire_seconds=self.cache_expire_seconds)
0196             cache.set("active_transforms", active_transforms, expire_seconds=self.cache_expire_seconds)
0197         default_value = {'new': 0, 'processing': 0}
0198         return num_processings.get(site_name, default_value), active_transforms
0199 
0200     def get_num_active_contents(self, site_name, active_transform_ids):
0201         cache = get_redis_cache()
0202         # 1. input contents not terminated
0203         # 2. output contents not terminated
0204         tf_id_site_map = {}
0205         all_tf_ids = []
0206         for site in active_transform_ids:
0207             all_tf_ids += active_transform_ids[site]
0208             for tf_id in active_transform_ids[site]:
0209                 tf_id_site_map[tf_id] = site
0210 
0211         num_input_contents = cache.get("num_input_contents", default=None)
0212         num_output_contents = cache.get("num_output_contents", default=None)
0213         if num_input_contents is None or num_output_contents is None:
0214             num_input_contents, num_output_contents = {}, {}
0215             if all_tf_ids:
0216                 ret = core_catalog.get_content_status_statistics_by_relation_type(all_tf_ids)
0217                 for item in ret:
0218                     status, relation_type, transform_id, count = item
0219                     site = tf_id_site_map[transform_id]
0220                     if site not in num_input_contents:
0221                         num_input_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0222                         num_output_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0223                     if status in [ContentStatus.New]:
0224                         if relation_type == ContentRelationType.Input:
0225                             num_input_contents[site]['new'] += count
0226                         elif relation_type == ContentRelationType.Output:
0227                             num_output_contents[site]['new'] += count
0228                     if status in [ContentStatus.Activated]:
0229                         if relation_type == ContentRelationType.Input:
0230                             num_input_contents[site]['activated'] += count
0231                         elif relation_type == ContentRelationType.Output:
0232                             num_output_contents[site]['activated'] += count
0233                     else:
0234                         if relation_type == ContentRelationType.Input:
0235                             num_input_contents[site]['processed'] += count
0236                         elif relation_type == ContentRelationType.Output:
0237                             num_output_contents[site]['processed'] += count
0238 
0239             cache.set("num_input_contents", num_input_contents, expire_seconds=self.cache_expire_seconds)
0240             cache.set("num_output_contents", num_output_contents, expire_seconds=self.cache_expire_seconds)
0241         default_value = {'new': 0, 'activated': 0, 'processed': 0}
0242         return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value)
0243 
0244     def get_closest_site(self, task_site, throttler_sites):
0245         try:
0246             self.logger.debug(f"task_site: {task_site}, throttler_sites: {throttler_sites}")
0247             if ',' in task_site:
0248                 cloud, site, queue = task_site.split(",")
0249             else:
0250                 # cloud = None
0251                 site = task_site
0252                 queue = None
0253 
0254             # Sort by length (descending) and alphabetically
0255             sorted_sites = sorted(throttler_sites, key=lambda x: (-len(x), x))
0256             for s in sorted_sites:
0257                 if queue and queue.startswith(s):
0258                     return s
0259                 elif site and site.startswith(s):
0260                     return s
0261         except Exception as ex:
0262             self.logger.warn(f"Failed to find closest site for {task_site}: {ex}")
0263         return None
0264 
0265     def whether_to_throttle(self, transform):
0266         try:
0267             throttlers = self.get_throttlers()
0268 
0269             site = transform['site']
0270             if site is None:
0271                 site = 'Default'
0272             else:
0273                 throttler_sites = [site for site in throttlers]
0274                 site = self.get_closest_site(site, throttler_sites)
0275                 if site is None:
0276                     site = 'Default'
0277             self.logger.info(f"throttler closest site for {transform['site']} is {site}")
0278 
0279             num_transforms = self.get_num_active_transforms(site)
0280             num_processings, active_transforms = self.get_num_active_processings(site)
0281             num_input_contents, num_output_contents = self.get_num_active_contents(site, active_transforms)
0282             self.logger.info("throttler(site: %s): transforms(%s), processings(%s)" % (site, num_transforms, num_processings))
0283             self.logger.info("throttler(site: %s): active input contents(%s), output contents(%s)" % (site, num_input_contents, num_output_contents))
0284 
0285             throttle_transforms = throttlers.get(site, {}).get('num_transforms', None)
0286             throttle_processings = throttlers.get(site, {}).get('num_processings', None)
0287             throttle_new_jobs = throttlers.get(site, {}).get('new_contents', None)
0288             throttle_queue_jobs = throttlers.get(site, {}).get('queue_contents', None)
0289             self.logger.info("throttler(site: %s): throttle_transforms: %s, throttle_processings: %s" % (site, throttle_transforms, throttle_processings))
0290             if throttle_transforms:
0291                 if num_transforms['processing'] >= throttle_transforms:
0292                     self.logger.info("throttler(site: %s): num of processing transforms (%s) is bigger than throttle_transforms (%s), set throttling" % (site, num_transforms['processing'], throttle_transforms))
0293                     return True
0294             if throttle_processings:
0295                 if num_processings['processing'] >= throttle_processings:
0296                     self.logger.info("throttler(site: %s): num of processing processings (%s) is bigger than throttle_processings (%s), set throttling" % (site, num_processings['processing'], throttle_processings))
0297                     return True
0298 
0299             new_jobs = num_input_contents['new']
0300             released_jobs = num_input_contents['processed']
0301             terminated_jobs = num_output_contents['processed']
0302             queue_jobs = released_jobs - terminated_jobs
0303 
0304             self.logger.info("throttler(site: %s): throttle_new_jobs: %s, throttle_queue_jobs: %s" % (site, throttle_new_jobs, throttle_queue_jobs))
0305             self.logger.info("throttler(site: %s): new_jobs: %s, queue_jobs: %s" % (site, new_jobs, queue_jobs))
0306             if throttle_new_jobs:
0307                 if new_jobs >= throttle_new_jobs:
0308                     self.logger.info("throttler(site: %s): num of new jobs(not released) (%s) is bigger than throttle_new_jobs (%s), set throttling" % (site, new_jobs, throttle_new_jobs))
0309                     return True
0310             if throttle_queue_jobs:
0311                 if queue_jobs >= throttle_queue_jobs:
0312                     self.logger.info("throttler(site: %s): num of queue jobs(released but not terminated) (%s) is bigger than throttle_queue_jobs (%s), set throttling" % (site, queue_jobs, throttle_queue_jobs))
0313                     return True
0314 
0315             return False
0316         except Exception as ex:
0317             self.logger.error("whether_to_throttle: %s" % str(ex))
0318             self.logger.error(traceback.format_exc())
0319         return False
0320 
0321     def get_queue_transforms(self):
0322         """
0323         Get queue transforms to set them to new if the throttler is ok.
0324         """
0325         try:
0326             if not self.is_ok_to_run_more_transforms():
0327                 return []
0328 
0329             self.show_queue_size()
0330 
0331             if BaseAgent.min_request_id is None:
0332                 return []
0333 
0334             transform_status = [TransformStatus.Queue, TransformStatus.Throttling]
0335             # next_poll_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_period)
0336             transforms_q = core_transforms.get_transforms_by_status(status=transform_status, locking=True,
0337                                                                     not_lock=False, order_by_fifo=True,
0338                                                                     new_poll=True,
0339                                                                     min_request_id=BaseAgent.min_request_id)
0340 
0341             # self.logger.debug("Main thread get %s New+Ready+Extend transforms to process" % len(transforms_new))
0342             if transforms_q:
0343                 self.logger.info("Main thread get queued transforms to process: %s" % str(transforms_q))
0344                 for tf in transforms_q:
0345                     self.process_queue_transform(transform=tf)
0346         except exceptions.DatabaseException as ex:
0347             if 'ORA-00060' in str(ex):
0348                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0349             else:
0350                 # raise ex
0351                 self.logger.error(ex)
0352                 self.logger.error(traceback.format_exc())
0353         return []
0354 
0355     def process_queue_transform(self, event=None, transform=None):
0356         self.number_workers += 1
0357         try:
0358             if transform is None and event:
0359                 tf_status = [TransformStatus.Queue, TransformStatus.Throttling]
0360                 tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True)
0361                 if not tf:
0362                     self.logger.warn("Cannot find transform for event: %s" % str(event))
0363                 else:
0364                     transform = tf
0365             if transform:
0366                 tf = transform
0367                 log_pre = self.get_log_prefix(tf)
0368                 self.logger.info(log_pre + "process_queue_transform")
0369                 to_throttle = self.whether_to_throttle(tf)
0370                 transform_parameters = {'locking': TransformLocking.Idle}
0371                 parameters = self.load_poll_period(tf, transform_parameters)
0372                 if to_throttle:
0373                     parameters['status'] = TransformStatus.Throttling
0374                     core_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters)
0375                 else:
0376                     # parameters['status'] = TransformStatus.New
0377                     # self.submit(self.process_new_transform, transform=tf)
0378                     # self.process_new_transform(transform=tf)
0379                     parameters['status'] = TransformStatus.New
0380                     core_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters)
0381 
0382                     self.logger.info(log_pre + "NewTransformEvent(transform_id: %s)" % str(tf['transform_id']))
0383                     event = NewTransformEvent(publisher_id=self.id, transform_id=tf['transform_id'])
0384                     self.event_bus.send(event)
0385         except Exception as ex:
0386             self.logger.error(ex)
0387             self.logger.error(traceback.format_exc())
0388         self.number_workers -= 1
0389 
0390     def get_new_transforms(self):
0391         """
0392         Get new transforms to process
0393         """
0394         try:
0395             if not self.is_ok_to_run_more_transforms():
0396                 return []
0397 
0398             self.show_queue_size()
0399 
0400             if BaseAgent.min_request_id is None:
0401                 return []
0402 
0403             transform_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend]
0404             # next_poll_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_period)
0405             transforms_new = core_transforms.get_transforms_by_status(status=transform_status, locking=True,
0406                                                                       not_lock=True, order_by_fifo=True,
0407                                                                       new_poll=True,
0408                                                                       min_request_id=BaseAgent.min_request_id,
0409                                                                       bulk_size=self.get_bulk_size())
0410 
0411             # self.logger.debug("Main thread get %s New+Ready+Extend transforms to process" % len(transforms_new))
0412             if transforms_new:
0413                 tf_ids = [tf['transform_id'] for tf in transforms_new]
0414                 self.logger.info("Main thread get New+Ready+Extend transforms to process: %s" % str(tf_ids))
0415 
0416             for tf in transforms_new:
0417                 self.submit(self.process_new_transform, transform=tf)
0418 
0419             return transforms_new
0420         except exceptions.DatabaseException as ex:
0421             if 'ORA-00060' in str(ex):
0422                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0423             else:
0424                 # raise ex
0425                 self.logger.error(ex)
0426                 self.logger.error(traceback.format_exc())
0427         return []
0428 
0429     def get_running_transforms(self):
0430         """
0431         Get running transforms
0432         """
0433         try:
0434             if not self.is_ok_to_run_more_transforms():
0435                 return []
0436 
0437             self.show_queue_size()
0438 
0439             if BaseAgent.min_request_id is None:
0440                 return []
0441 
0442             transform_status = [TransformStatus.Transforming,
0443                                 TransformStatus.ToCancel, TransformStatus.Cancelling,
0444                                 TransformStatus.ToSuspend, TransformStatus.Suspending,
0445                                 TransformStatus.ToExpire, TransformStatus.Expiring,
0446                                 TransformStatus.ToResume, TransformStatus.Resuming,
0447                                 TransformStatus.ToFinish, TransformStatus.ToForceFinish]
0448             transforms = core_transforms.get_transforms_by_status(status=transform_status,
0449                                                                   period=None,
0450                                                                   locking=True,
0451                                                                   not_lock=True,
0452                                                                   min_request_id=BaseAgent.min_request_id,
0453                                                                   update_poll=True,
0454                                                                   bulk_size=self.get_bulk_size())
0455 
0456             # self.logger.debug("Main thread get %s transforming transforms to process" % len(transforms))
0457             if transforms:
0458                 tf_ids = [tf['transform_id'] for tf in transforms]
0459                 self.logger.info("Main thread get transforming transforms to process: %s" % str(tf_ids))
0460 
0461             for tf in transforms:
0462                 self.submit(self.process_update_transform, transform=tf)
0463 
0464             return transforms
0465         except exceptions.DatabaseException as ex:
0466             if 'ORA-00060' in str(ex):
0467                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0468             else:
0469                 self.logger.error(ex)
0470                 self.logger.error(traceback.format_exc())
0471         return []
0472 
0473     def get_transform(self, transform_id, status=None, locking=False):
0474         try:
0475             return core_transforms.get_transform_by_id_status(transform_id=transform_id, status=status, locking=locking)
0476         except exceptions.DatabaseException as ex:
0477             if 'ORA-00060' in str(ex):
0478                 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0479             else:
0480                 # raise ex
0481                 self.logger.error(ex)
0482                 self.logger.error(traceback.format_exc())
0483         return None
0484 
0485     def load_poll_period(self, transform, parameters):
0486         if self.new_poll_period and transform['new_poll_period'] != self.new_poll_period:
0487             parameters['new_poll_period'] = self.new_poll_period
0488         if self.update_poll_period and transform['update_poll_period'] != self.update_poll_period:
0489             parameters['update_poll_period'] = self.update_poll_period
0490         return parameters
0491 
0492     def trigger_condition(self, request_id, condition):
0493         update_condition = {}
0494         update_transforms = []
0495         cond = core_conditions.load_condition(condition)
0496         is_triggered, is_updated, ret = cond.evaluate()
0497         if is_triggered or is_updated:
0498             update_condition['condition_id'] = cond.condition_id
0499             update_condition['previous_works'] = cond.previous_works   # previous_works = {'internal_id': <>, 'status': <OK/NotOK>}
0500 
0501         if is_triggered:
0502             update_condition['condition_id'] = cond.condition_id
0503             update_condition['status'] = ConditionStatus.Triggered
0504             update_condition['evaluate_result'] = cond.result
0505 
0506             triggered_works = cond.get_triggered_works(ret)
0507             if triggered_works:
0508                 internal_ids = [w['internal_id'] for w in triggered_works]
0509                 triggered_transforms = core_transforms.get_transforms(request_id=request_id,
0510                                                                       loop_index=condition['loop_index'],
0511                                                                       internal_ids=internal_ids)
0512                 for tf in triggered_transforms:
0513                     if tf['status'] in [TransformStatus.WaitForTrigger]:
0514                         # change transform status from WaitForTrigger to New
0515                         u_transform = {'transform_id': tf['transform_id'], 'status': TransformStatus.New}
0516                         update_transforms.append(u_transform)
0517             return is_triggered, is_updated, update_condition, update_transforms
0518 
0519     def evaluate_conditions(self, transform):
0520         if not transform['has_conditons']:
0521             return
0522 
0523         update_conditions = []
0524         all_update_triggered_transforms = []
0525         update_current_transform = None
0526 
0527         loop_index = transform['loop_index']
0528         triggered_conditions = transform['triggered_conditions']
0529         untriggered_conditions = transform['untriggered_conditions']
0530 
0531         new_triggered_conditions = []
0532         u_cond_ids = [u_cond['internal_id'] for u_cond in untriggered_conditions]
0533         conditions = core_conditions.get_condtions(request_id=transform['request_id'], internal_ids=u_cond_ids, loop_index=loop_index)
0534         cond_dict = {}
0535         for cond in conditions:
0536             if (loop_index is None and cond['loop_index'] is None) or (loop_index == cond['loop_index']):
0537                 cond_dict[cond['internal_id']] = cond
0538         for u_cond in untriggered_conditions:
0539             cond = cond_dict[u_cond['internal_id']]
0540             if cond['status'] not in [ConditionStatus.WaitForTrigger]:
0541                 ret = self.trigger_condition(request_id=transform['request_id'], condition=cond)
0542                 is_triggered, is_updated, update_condition, update_triggered_transforms = ret
0543                 if is_triggered or is_updated:
0544                     # is_triggered: the condition is triggered
0545                     # is_updated: the condition has multiple previous items. The item related to current transform is updated to ok,
0546                     #             waiting for the other item to be ok.
0547                     new_triggered_conditions.append(u_cond)
0548                 if update_condition:
0549                     update_conditions.append(update_condition)
0550                 if update_triggered_transforms:
0551                     all_update_triggered_transforms = all_update_triggered_transforms + update_triggered_transforms
0552             else:
0553                 new_triggered_conditions.append(u_cond)
0554         if new_triggered_conditions:
0555             new_triggered_conditions_dict = {new_cond['internal_id']: new_cond for new_cond in new_triggered_conditions}
0556             untriggered_conditions_copy = copy.deepcopy(untriggered_conditions)
0557             untriggered_conditions = []
0558             for u_cond in untriggered_conditions_copy:
0559                 if u_cond['internal_id'] in new_triggered_conditions_dict:
0560                     triggered_conditions.append(u_cond)
0561                 else:
0562                     untriggered_conditions.append(u_cond)
0563             update_current_transform = {'transform_id': transform['transform_id'],
0564                                         'triggered_conditions': triggered_conditions,
0565                                         'untriggered_conditions': untriggered_conditions}
0566         # return new_triggered_conditions, triggered_conditions, untriggered_conditions
0567         return update_current_transform, update_conditions, all_update_triggered_transforms
0568 
0569     def generate_processing_model(self, transform):
0570         new_processing_model = {}
0571         new_processing_model['transform_id'] = transform['transform_id']
0572         new_processing_model['request_id'] = transform['request_id']
0573         new_processing_model['workload_id'] = transform['workload_id']
0574         new_processing_model['status'] = ProcessingStatus.New
0575         # new_processing_model['expired_at'] = work.get_expired_at(None)
0576         new_processing_model['expired_at'] = transform['expired_at']
0577 
0578         new_processing_model['loop_index'] = transform['loop_index']
0579         new_processing_model['internal_id'] = transform['internal_id']
0580         new_processing_model['parent_internal_id'] = transform['parent_internal_id']
0581 
0582         new_processing_model['processing_type'] = get_processing_type_from_transform_type(transform['transform_type'])
0583         new_processing_model['new_poll_period'] = transform['new_poll_period']
0584         new_processing_model['update_poll_period'] = transform['update_poll_period']
0585         new_processing_model['max_new_retries'] = transform['max_new_retries']
0586         new_processing_model['max_update_retries'] = transform['max_update_retries']
0587         return new_processing_model
0588 
0589     def get_log_prefix(self, transform):
0590         if transform:
0591             return "<request_id=%s,transform_id=%s>" % (transform['request_id'], transform['transform_id'])
0592         self.logger.error("get_log_prefix transform is empty: %s" % str(transform))
0593         return ""
0594 
0595     def handle_new_transform_real(self, transform):
0596         """
0597         Process new transform
0598         """
0599         log_pre = self.get_log_prefix(transform)
0600         self.logger.info(log_pre + "handle_new_transform: transform_id: %s" % transform['transform_id'])
0601 
0602         work = transform['transform_metadata']['work']
0603         work.set_work_id(transform['transform_id'])
0604         work.set_agent_attributes(self.agent_attributes, transform)
0605 
0606         work_name_to_coll_map = core_transforms.get_work_name_to_coll_map(request_id=transform['request_id'])
0607         work.set_work_name_to_coll_map(work_name_to_coll_map)
0608 
0609         new_processing_model = None
0610 
0611         processing = work.get_processing(input_output_maps=[], without_creating=True)
0612         self.logger.debug(log_pre + "work get_processing: %s" % processing)
0613         processing_model = core_processings.get_processing(request_id=transform['request_id'], transform_id=transform['transform_id'])
0614         if processing_model:
0615             work.sync_processing(processing, processing_model)
0616             proc = processing_model['processing_metadata']['processing']
0617             work.sync_work_data(status=processing_model['status'], substatus=processing_model['substatus'],
0618                                 work=proc.work, output_data=processing_model['output_metadata'], processing=proc)
0619             # processing_metadata = processing_model['processing_metadata']
0620             if processing_model['errors']:
0621                 work.set_terminated_msg(processing_model['errors'])
0622             # work.set_processing_output_metadata(processing, processing_model['output_metadata'])
0623             work.set_output_data(processing.output_data)
0624             transform['workload_id'] = processing_model['workload_id']
0625         else:
0626             # create processing
0627             processing = work.get_processing(input_output_maps=[], without_creating=False)
0628             self.logger.debug(log_pre + "work get_processing with creating: %s" % processing)
0629             if processing and not processing.processing_id:
0630                 new_processing_model = self.generate_processing_model(transform)
0631 
0632                 proc_work = copy.deepcopy(work)
0633                 proc_work.clean_work()
0634                 processing.work = proc_work
0635                 new_processing_model['processing_metadata'] = {'processing': processing}
0636 
0637         transform_parameters = {'status': TransformStatus.Transforming,
0638                                 'locking': TransformLocking.Idle,
0639                                 'workload_id': transform['workload_id'],
0640                                 'transform_metadata': transform['transform_metadata']}
0641 
0642         transform_parameters = self.load_poll_period(transform, transform_parameters)
0643 
0644         if new_processing_model is not None:
0645             if 'new_poll_period' in transform_parameters:
0646                 new_processing_model['new_poll_period'] = transform_parameters['new_poll_period']
0647             if 'update_poll_period' in transform_parameters:
0648                 new_processing_model['update_poll_period'] = transform_parameters['update_poll_period']
0649             if 'max_new_retries' in transform_parameters:
0650                 new_processing_model['max_new_retries'] = transform_parameters['max_new_retries']
0651             if 'max_update_retries' in transform_parameters:
0652                 new_processing_model['max_update_retries'] = transform_parameters['max_update_retries']
0653 
0654         ret = {'transform': transform,
0655                'transform_parameters': transform_parameters,
0656                'new_processing': new_processing_model
0657                }
0658         return ret
0659 
0660     def handle_new_transform(self, transform, check_previous=False):
0661         """
0662         Process new transform
0663         """
0664         try:
0665             log_pre = self.get_log_prefix(transform)
0666             work = transform['transform_metadata']['work']
0667             if not check_previous:
0668                 ret = self.handle_new_transform_real(transform)
0669             else:
0670                 pre_works_are_ok = True
0671                 if work.parent_internal_id is not None:
0672                     parent_internal_ids = work.parent_internal_id.split(",")
0673                     tfs = core_transforms.get_transforms(request_id=transform['request_id'], internal_ids=parent_internal_ids, loop_index=transform['loop_index'])
0674                     if not tfs:
0675                         pre_works_are_ok = False
0676                     else:
0677                         for tf in tfs:
0678                             if not tf['workload_id']:
0679                                 pre_works_are_ok = False
0680                 if pre_works_are_ok:
0681                     ret = self.handle_new_transform_real(transform)
0682                 else:
0683                     # not update the status
0684                     transform_parameters = {'locking': TransformLocking.Idle}
0685                     ret = {'transform': transform,
0686                            'transform_parameters': transform_parameters,
0687                            }
0688             self.logger.info(log_pre + "handle_new_transform result: %s" % str(ret))
0689         except Exception as ex:
0690             self.logger.error(ex)
0691             self.logger.error(traceback.format_exc())
0692             retries = transform['new_retries'] + 1
0693             if not transform['max_new_retries'] or retries < transform['max_new_retries']:
0694                 tf_status = transform['status']
0695             else:
0696                 tf_status = TransformStatus.Failed
0697 
0698             # increase poll period
0699             new_poll_period = int(transform['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0700             if new_poll_period > self.max_new_poll_period:
0701                 new_poll_period = self.max_new_poll_period
0702 
0703             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0704 
0705             transform_parameters = {'status': tf_status,
0706                                     'new_retries': retries,
0707                                     'new_poll_period': new_poll_period,
0708                                     'errors': transform['errors'] if transform['errors'] else {},
0709                                     'locking': TransformLocking.Idle}
0710             transform_parameters['errors'].update(error)
0711             ret = {'transform': transform, 'transform_parameters': transform_parameters}
0712             self.logger.info(log_pre + "handle_new_transform exception result: %s" % str(ret))
0713         return ret
0714 
0715     def generate_collection(self, transform, collection, relation_type=CollectionRelationType.Input):
0716         coll = {'transform_id': transform['transform_id'],
0717                 'request_id': transform['request_id'],
0718                 'workload_id': transform['workload_id'],
0719                 'coll_type': CollectionType.Dataset,
0720                 'scope': collection['scope'],
0721                 'name': collection['name'][:254],
0722                 'relation_type': relation_type,
0723                 'bytes': 0,
0724                 'total_files': 0,
0725                 'new_files': 0,
0726                 'processed_files': 0,
0727                 'processing_files': 0,
0728                 'coll_metadata': None,
0729                 'status': CollectionStatus.Open,
0730                 'expired_at': transform['expired_at']}
0731         return coll
0732 
0733     def handle_new_itransform_real(self, transform):
0734         """
0735         Process new transform
0736         """
0737         log_pre = self.get_log_prefix(transform)
0738         self.logger.info(log_pre + "handle_new_itransform: transform_id: %s" % transform['transform_id'])
0739 
0740         work = transform['transform_metadata']['work']
0741         if work.workflow_type in [WorkflowType.iWork]:
0742             work.transform_id = transform['transform_id']
0743 
0744         # create processing
0745         new_processing_model = self.generate_processing_model(transform)
0746         new_processing_model['processing_metadata'] = {'work': work}
0747 
0748         transform_parameters = {'status': TransformStatus.Transforming,
0749                                 'locking': TransformLocking.Idle,
0750                                 'workload_id': transform['workload_id']}
0751 
0752         transform_parameters = self.load_poll_period(transform, transform_parameters)
0753 
0754         if new_processing_model is not None:
0755             if 'new_poll_period' in transform_parameters:
0756                 new_processing_model['new_poll_period'] = transform_parameters['new_poll_period']
0757             if 'update_poll_period' in transform_parameters:
0758                 new_processing_model['update_poll_period'] = transform_parameters['update_poll_period']
0759             if 'max_new_retries' in transform_parameters:
0760                 new_processing_model['max_new_retries'] = transform_parameters['max_new_retries']
0761             if 'max_update_retries' in transform_parameters:
0762                 new_processing_model['max_update_retries'] = transform_parameters['max_update_retries']
0763 
0764         func_name = work.get_func_name()
0765         func_name = func_name.split(':')[-1]
0766         input_coll = {'scope': 'pseudo_dataset', 'name': 'pseudo_input_%s' % func_name}
0767         output_coll = {'scope': 'pseudo_dataset', 'name': 'pseudo_output_%s' % func_name}
0768 
0769         input_collection = self.generate_collection(transform, input_coll, relation_type=CollectionRelationType.Input)
0770         output_collection = self.generate_collection(transform, output_coll, relation_type=CollectionRelationType.Output)
0771 
0772         ret = {'transform': transform,
0773                'transform_parameters': transform_parameters,
0774                'new_processing': new_processing_model,
0775                'input_collections': [input_collection],
0776                'output_collections': [output_collection]
0777                }
0778         return ret
0779 
0780     def handle_new_itransform(self, transform, check_previous=False):
0781         """
0782         Process new transform
0783         """
0784         try:
0785             log_pre = self.get_log_prefix(transform)
0786             work = transform['transform_metadata']['work']
0787             pre_works_are_ok = True
0788             pre_workload_id = None
0789             if work.parent_internal_id is not None:
0790                 parent_internal_ids = work.parent_internal_id.split(",")
0791                 tfs = core_transforms.get_transforms(request_id=transform['request_id'], internal_ids=parent_internal_ids, loop_index=transform['loop_index'])
0792                 self.logger.info(log_pre + f"handle_new_itransform parent_internal_id {work.parent_internal_id}, parent_transforms: {tfs}")
0793                 if not tfs:
0794                     pre_works_are_ok = False
0795                 else:
0796                     for tf in tfs:
0797                         if not tf['workload_id']:
0798                             pre_works_are_ok = False
0799                         pre_workload_id = tf['workload_id']
0800             if pre_workload_id:
0801                 transform['transform_metadata']['work'].parent_workload_id = pre_workload_id
0802             if pre_works_are_ok:
0803                 ret = self.handle_new_itransform_real(transform)
0804             else:
0805                 # not update the status
0806                 transform_parameters = {'locking': TransformLocking.Idle}
0807                 ret = {'transform': transform,
0808                        'transform_parameters': transform_parameters,
0809                        }
0810 
0811             self.logger.info(log_pre + "handle_new_itransform result: %s" % str(ret))
0812         except Exception as ex:
0813             self.logger.error(ex)
0814             self.logger.error(traceback.format_exc())
0815             retries = transform['new_retries'] + 1
0816             if not transform['max_new_retries'] or retries < transform['max_new_retries']:
0817                 tf_status = transform['status']
0818             else:
0819                 tf_status = TransformStatus.Failed
0820 
0821             # increase poll period
0822             new_poll_period = int(transform['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0823             if new_poll_period > self.max_new_poll_period:
0824                 new_poll_period = self.max_new_poll_period
0825 
0826             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0827 
0828             transform_parameters = {'status': tf_status,
0829                                     'new_retries': retries,
0830                                     'new_poll_period': new_poll_period,
0831                                     'errors': transform['errors'] if transform['errors'] else {},
0832                                     'locking': TransformLocking.Idle}
0833             transform_parameters['errors'].update(error)
0834             ret = {'transform': transform, 'transform_parameters': transform_parameters}
0835             self.logger.info(log_pre + "handle_new_itransform exception result: %s" % str(ret))
0836         return ret
0837 
0838     def handle_new_generic_transform(self, transform):
0839         """
0840         Process new transform
0841         """
0842         try:
0843             log_pre = self.get_log_prefix(transform)
0844             if transform['transform_type'] in [TransformType.GenericWorkflow]:
0845                 ret = self.handle_new_generic_transform_real(transform)
0846             elif transform['transform_type'] in [TransformType.GenericWork]:
0847                 ret = self.handle_new_generic_transform_real(transform)
0848             self.logger.info(log_pre + "handle_new_generic_transform result: %s" % str(ret))
0849         except Exception as ex:
0850             self.logger.error(ex)
0851             self.logger.error(traceback.format_exc())
0852             retries = transform['new_retries'] + 1
0853             if not transform['max_new_retries'] or retries < transform['max_new_retries']:
0854                 tf_status = transform['status']
0855             else:
0856                 tf_status = TransformStatus.Failed
0857 
0858             # increase poll period
0859             new_poll_period = int(transform['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0860             if new_poll_period > self.max_new_poll_period:
0861                 new_poll_period = self.max_new_poll_period
0862 
0863             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0864 
0865             transform_parameters = {'status': tf_status,
0866                                     'new_retries': retries,
0867                                     'new_poll_period': new_poll_period,
0868                                     'errors': transform['errors'] if transform['errors'] else {},
0869                                     'locking': TransformLocking.Idle}
0870             transform_parameters['errors'].update(error)
0871             ret = {'transform': transform, 'transform_parameters': transform_parameters}
0872             self.logger.info(log_pre + "handle_new_generic_transform exception result: %s" % str(ret))
0873         return ret
0874 
0875     def update_transform(self, ret):
0876         new_pr_ids, update_pr_ids = [], []
0877         try:
0878             if ret:
0879                 log_pre = self.get_log_prefix(ret['transform'])
0880                 self.logger.info(log_pre + "Update transform: %s" % str(ret))
0881 
0882                 ret['transform_parameters']['locking'] = TransformLocking.Idle
0883                 ret['transform_parameters']['updated_at'] = datetime.datetime.utcnow()
0884 
0885                 retry = True
0886                 retry_num = 0
0887                 while retry:
0888                     retry = False
0889                     retry_num += 1
0890                     try:
0891                         # self.logger.debug("wen: %s" % str(ret['output_contents']))
0892                         new_pr_ids, update_pr_ids = core_transforms.add_transform_outputs(transform=ret['transform'],
0893                                                                                           transform_parameters=ret['transform_parameters'],
0894                                                                                           input_collections=ret.get('input_collections', None),
0895                                                                                           output_collections=ret.get('output_collections', None),
0896                                                                                           log_collections=ret.get('log_collections', None),
0897                                                                                           new_contents=ret.get('new_contents', None),
0898                                                                                           update_input_collections=ret.get('update_input_collections', None),
0899                                                                                           update_output_collections=ret.get('update_output_collections', None),
0900                                                                                           update_log_collections=ret.get('update_log_collections', None),
0901                                                                                           update_contents=ret.get('update_contents', None),
0902                                                                                           messages=ret.get('messages', None),
0903                                                                                           update_messages=ret.get('update_messages', None),
0904                                                                                           new_processing=ret.get('new_processing', None),
0905                                                                                           update_processing=ret.get('update_processing', None),
0906                                                                                           message_bulk_size=self.message_bulk_size)
0907                     except exceptions.DatabaseException as ex:
0908                         if 'ORA-00060' in str(ex):
0909                             self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0910                             if retry_num < 5:
0911                                 retry = True
0912                                 if retry_num <= 1:
0913                                     random_sleep = random.randint(1, 10)
0914                                 elif retry_num <= 2:
0915                                     random_sleep = random.randint(1, 60)
0916                                 else:
0917                                     random_sleep = random.randint(1, 120)
0918                                 time.sleep(random_sleep)
0919                             else:
0920                                 raise ex
0921                         else:
0922                             raise ex
0923                             # self.logger.error(ex)
0924                             # self.logger.error(traceback.format_exc())
0925         except Exception as ex:
0926             self.logger.error(ex)
0927             self.logger.error(traceback.format_exc())
0928             try:
0929                 transform_parameters = {'status': TransformStatus.Transforming,
0930                                         'locking': TransformLocking.Idle}
0931                 if 'new_retries' in ret['transform_parameters']:
0932                     transform_parameters['new_retries'] = ret['transform_parameters']['new_retries']
0933                 if 'update_retries' in ret['transform_parameters']:
0934                     transform_parameters['update_retries'] = ret['transform_parameters']['update_retries']
0935                 if 'errors' in ret['transform_parameters']:
0936                     transform_parameters['errors'] = ret['transform_parameters']['errors']
0937 
0938                 log_pre = self.get_log_prefix(ret['transform'])
0939                 self.logger.warn(log_pre + "update transform exception result: %s" % str(transform_parameters))
0940 
0941                 new_pr_ids, update_pr_ids = core_transforms.add_transform_outputs(transform=ret['transform'],
0942                                                                                   transform_parameters=transform_parameters)
0943             except Exception as ex:
0944                 self.logger.error(ex)
0945                 self.logger.error(traceback.format_exc())
0946         return new_pr_ids, update_pr_ids
0947 
0948     def process_new_transform(self, event=None, transform=None):
0949         self.number_workers += 1
0950         try:
0951             if transform is None and event:
0952                 tf_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend]
0953                 tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True)
0954                 if not tf:
0955                     self.logger.warn("Cannot find transform for event: %s" % str(event))
0956                 else:
0957                     transform = tf
0958             if transform:
0959                 tf = transform
0960                 log_pre = self.get_log_prefix(tf)
0961                 self.logger.info(log_pre + "process_new_transform")
0962                 if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
0963                     ret = self.handle_new_itransform(tf)
0964                 elif tf['transform_type'] in [TransformType.GenericWorkflow, TransformType.GenericWork]:
0965                     ret = self.handle_new_generic_transform(tf)
0966                 else:
0967                     ret = self.handle_new_transform(tf)
0968                 self.logger.info(log_pre + "process_new_transform result: %s" % str(ret))
0969 
0970                 new_pr_ids, update_pr_ids = self.update_transform(ret)
0971                 for pr_id in new_pr_ids:
0972                     self.logger.info(log_pre + "NewProcessingEvent(processing_id: %s)" % pr_id)
0973                     event = NewProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0974                     self.event_bus.send(event)
0975                 for pr_id in update_pr_ids:
0976                     self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % pr_id)
0977                     event = UpdateProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0978                     self.event_bus.send(event)
0979         except Exception as ex:
0980             self.logger.error(ex)
0981             self.logger.error(traceback.format_exc())
0982         self.number_workers -= 1
0983 
0984     def handle_update_transform_real(self, transform, event):
0985         """
0986         process running transforms
0987         """
0988         log_pre = self.get_log_prefix(transform)
0989 
0990         self.logger.info(log_pre + "handle_update_transform: transform_id: %s" % transform['transform_id'])
0991 
0992         is_terminated = False
0993         to_abort = False
0994         if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
0995             and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):      # noqa W503
0996             to_abort = True
0997             self.logger.info(log_pre + "to_abort %s" % to_abort)
0998 
0999         work = transform['transform_metadata']['work']
1000         work.set_work_id(transform['transform_id'])
1001         work.set_agent_attributes(self.agent_attributes, transform)
1002 
1003         work_name_to_coll_map = core_transforms.get_work_name_to_coll_map(request_id=transform['request_id'])
1004         work.set_work_name_to_coll_map(work_name_to_coll_map)
1005 
1006         # link processings
1007         new_processing_model, processing_model = None, None
1008         ret_processing_id = None
1009 
1010         processing = work.get_processing(input_output_maps=[], without_creating=True)
1011         self.logger.debug(log_pre + "work get_processing: %s" % processing)
1012         if processing and processing.processing_id:
1013             ret_processing_id = processing.processing_id
1014             processing_model = core_processings.get_processing(processing_id=processing.processing_id)
1015             work.sync_processing(processing, processing_model)
1016             proc = processing_model['processing_metadata']['processing']
1017             work.sync_work_data(status=processing_model['status'], substatus=processing_model['substatus'],
1018                                 work=proc.work, output_data=processing_model['output_metadata'], processing=proc)
1019             # processing_metadata = processing_model['processing_metadata']
1020             if processing_model['errors']:
1021                 work.set_terminated_msg(processing_model['errors'])
1022             # work.set_processing_output_metadata(processing, processing_model['output_metadata'])
1023             work.set_output_data(processing.output_data)
1024             transform['workload_id'] = processing_model['workload_id']
1025         else:
1026             if not processing:
1027                 processing = work.get_processing(input_output_maps=[], without_creating=False)
1028                 self.logger.debug(log_pre + "work get_processing with creating: %s" % processing)
1029             new_processing_model = self.generate_processing_model(transform)
1030 
1031             proc_work = copy.deepcopy(work)
1032             proc_work.clean_work()
1033             processing.work = proc_work
1034             new_processing_model['processing_metadata'] = {'processing': processing}
1035 
1036         self.logger.info(log_pre + "syn_work_status: %s, transform status: %s" % (transform['transform_id'], transform['status']))
1037         if work.is_terminated():
1038             is_terminated = True
1039             self.logger.info(log_pre + "Transform(%s) work is terminated: work status: %s" % (transform['transform_id'], work.get_status()))
1040             if work.is_finished():
1041                 transform['status'] = TransformStatus.Finished
1042             else:
1043                 if to_abort:
1044                     transform['status'] = TransformStatus.Cancelled
1045                 elif work.is_subfinished():
1046                     transform['status'] = TransformStatus.SubFinished
1047                 elif work.is_failed():
1048                     transform['status'] = TransformStatus.Failed
1049                 else:
1050                     transform['status'] = TransformStatus.Failed
1051 
1052         transform_parameters = {'status': transform['status'],
1053                                 'locking': TransformLocking.Idle,
1054                                 'workload_id': transform['workload_id'],
1055                                 'transform_metadata': transform['transform_metadata']}
1056         transform_parameters = self.load_poll_period(transform, transform_parameters)
1057 
1058         if new_processing_model is not None:
1059             if 'new_poll_period' in transform_parameters:
1060                 new_processing_model['new_poll_period'] = transform_parameters['new_poll_period']
1061             if 'update_poll_period' in transform_parameters:
1062                 new_processing_model['update_poll_period'] = transform_parameters['update_poll_period']
1063             if 'max_new_retries' in transform_parameters:
1064                 new_processing_model['max_new_retries'] = transform_parameters['max_new_retries']
1065             if 'max_update_retries' in transform_parameters:
1066                 new_processing_model['max_update_retries'] = transform_parameters['max_update_retries']
1067 
1068         ret = {'transform': transform,
1069                'transform_parameters': transform_parameters,
1070                'new_processing': new_processing_model}
1071         return ret, is_terminated, ret_processing_id
1072 
1073     def handle_update_transform(self, transform, event):
1074         """
1075         Process running transform
1076         """
1077         try:
1078             log_pre = self.get_log_prefix(transform)
1079 
1080             self.logger.info(log_pre + "handle_update_transform: %s" % transform)
1081             ret, is_terminated, ret_processing_id = self.handle_update_transform_real(transform, event)
1082             self.logger.info(log_pre + "handle_update_transform result: %s" % str(ret))
1083             return ret, is_terminated, ret_processing_id
1084         except Exception as ex:
1085             self.logger.error(ex)
1086             self.logger.error(traceback.format_exc())
1087 
1088             retries = transform['update_retries'] + 1
1089             if not transform['max_update_retries'] or retries < transform['max_update_retries']:
1090                 tf_status = transform['status']
1091             else:
1092                 tf_status = TransformStatus.Failed
1093             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1094 
1095             # increase poll period
1096             update_poll_period = int(transform['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1097             if update_poll_period > self.max_update_poll_period:
1098                 update_poll_period = self.max_update_poll_period
1099 
1100             transform_parameters = {'status': tf_status,
1101                                     'update_retries': retries,
1102                                     'update_poll_period': update_poll_period,
1103                                     'errors': transform['errors'] if transform['errors'] else {},
1104                                     'locking': TransformLocking.Idle}
1105             transform_parameters['errors'].update(error)
1106 
1107             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1108             self.logger.warn(log_pre + "handle_update_transform exception result: %s" % str(ret))
1109         return ret, False, None
1110 
1111     def handle_update_itransform_real(self, transform, event):
1112         """
1113         process running transforms
1114         """
1115         log_pre = self.get_log_prefix(transform)
1116 
1117         self.logger.info(log_pre + "handle_update_itransform: transform_id: %s" % transform['transform_id'])
1118 
1119         # work = transform['transform_metadata']['work']
1120 
1121         prs = core_processings.get_processings(transform_id=transform['transform_id'])
1122         pr = None
1123         for pr in prs:
1124             if pr['processing_id'] == transform['current_processing_id']:
1125                 transform['workload_id'] = pr['workload_id']
1126                 break
1127 
1128         errors = None
1129         if pr:
1130             transform['status'] = get_transform_status_from_processing_status(pr['status'])
1131             log_msg = log_pre + "transform id: %s, transform status: %s" % (transform['transform_id'], transform['status'])
1132             log_msg = log_msg + ", processing id: %s, processing status: %s" % (pr['processing_id'], pr['status'])
1133             self.logger.info(log_msg)
1134         else:
1135             transform['status'] = TransformStatus.Failed
1136             log_msg = log_pre + "transform id: %s, transform status: %s" % (transform['transform_id'], transform['status'])
1137             log_msg = log_msg + ", no attached processings."
1138             self.logger.error(log_msg)
1139             errors = {'submit_err': 'no attached processings'}
1140 
1141         is_terminated = False
1142         if transform['status'] in [TransformStatus.Finished, TransformStatus.Failed, TransformStatus.Cancelled,
1143                                    TransformStatus.SubFinished, TransformStatus.Suspended, TransformStatus.Expired]:
1144             is_terminated = True
1145 
1146         transform_parameters = {'status': transform['status'],
1147                                 'locking': TransformLocking.Idle,
1148                                 'workload_id': transform['workload_id']}
1149         transform_parameters = self.load_poll_period(transform, transform_parameters)
1150         if errors:
1151             transform_parameters['errors'] = errors
1152 
1153         ret = {'transform': transform,
1154                'transform_parameters': transform_parameters}
1155         return ret, is_terminated, None
1156 
1157     def handle_update_itransform(self, transform, event):
1158         """
1159         Process running transform
1160         """
1161         try:
1162             log_pre = self.get_log_prefix(transform)
1163 
1164             self.logger.info(log_pre + "handle_update_itransform: %s" % transform)
1165             ret, is_terminated, ret_processing_id = self.handle_update_itransform_real(transform, event)
1166             self.logger.info(log_pre + "handle_update_itransform result: %s" % str(ret))
1167             return ret, is_terminated, ret_processing_id
1168         except Exception as ex:
1169             self.logger.error(ex)
1170             self.logger.error(traceback.format_exc())
1171 
1172             retries = transform['update_retries'] + 1
1173             if not transform['max_update_retries'] or retries < transform['max_update_retries']:
1174                 tf_status = transform['status']
1175             else:
1176                 tf_status = TransformStatus.Failed
1177             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1178 
1179             # increase poll period
1180             update_poll_period = int(transform['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1181             if update_poll_period > self.max_update_poll_period:
1182                 update_poll_period = self.max_update_poll_period
1183 
1184             transform_parameters = {'status': tf_status,
1185                                     'update_retries': retries,
1186                                     'update_poll_period': update_poll_period,
1187                                     'errors': transform['errors'] if transform['errors'] else {},
1188                                     'locking': TransformLocking.Idle}
1189             transform_parameters['errors'].update(error)
1190 
1191             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1192             self.logger.warn(log_pre + "handle_update_itransform exception result: %s" % str(ret))
1193         return ret, False, None
1194 
1195     def handle_update_generic_transform(self, transform, event):
1196         """
1197         Process running transform
1198         """
1199         try:
1200             log_pre = self.get_log_prefix(transform)
1201 
1202             self.logger.info(log_pre + "handle_update_generic_transform: %s" % transform)
1203             ret, is_terminated, ret_processing_id = self.handle_update_generic_transform_real(transform, event)
1204             self.logger.info(log_pre + "handle_update_generic_transform result: %s" % str(ret))
1205             return ret, is_terminated, ret_processing_id
1206         except Exception as ex:
1207             self.logger.error(ex)
1208             self.logger.error(traceback.format_exc())
1209 
1210             retries = transform['update_retries'] + 1
1211             if not transform['max_update_retries'] or retries < transform['max_update_retries']:
1212                 tf_status = transform['status']
1213             else:
1214                 tf_status = TransformStatus.Failed
1215             error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1216 
1217             # increase poll period
1218             update_poll_period = int(transform['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1219             if update_poll_period > self.max_update_poll_period:
1220                 update_poll_period = self.max_update_poll_period
1221 
1222             transform_parameters = {'status': tf_status,
1223                                     'update_retries': retries,
1224                                     'update_poll_period': update_poll_period,
1225                                     'errors': transform['errors'] if transform['errors'] else {},
1226                                     'locking': TransformLocking.Idle}
1227             transform_parameters['errors'].update(error)
1228 
1229             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1230             self.logger.warn(log_pre + "handle_update_generic_transform exception result: %s" % str(ret))
1231         return ret, False, None
1232 
1233     def process_update_transform(self, event=None, transform=None):
1234         self.number_workers += 1
1235         pro_ret = ReturnCode.Ok.value
1236         try:
1237             if transform is None and event:
1238                 # tf_status = [TransformStatus.Transforming,
1239                 #              TransformStatus.ToCancel, TransformStatus.Cancelling,
1240                 #              TransformStatus.ToSuspend, TransformStatus.Suspending,
1241                 #              TransformStatus.ToExpire, TransformStatus.Expiring,
1242                 #              TransformStatus.ToResume, TransformStatus.Resuming,
1243                 #              TransformStatus.ToFinish, TransformStatus.ToForceFinish]
1244                 # tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True)
1245                 tf = self.get_transform(transform_id=event._transform_id, locking=True)
1246                 if not tf:
1247                     self.logger.error("Cannot find transform for event: %s" % str(event))
1248                     pro_ret = ReturnCode.Locked.value
1249                 else:
1250                     transform = tf
1251             if transform:
1252                 tf = transform
1253                 log_pre = self.get_log_prefix(tf)
1254 
1255                 if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
1256                     ret, is_terminated, ret_processing_id = self.handle_update_itransform(tf, event)
1257                 elif tf['transform_type'] in [TransformType.GenericWorkflow, TransformType.GenericWork]:
1258                     ret, is_terminated, ret_processing_id = self.handle_update_generic_transform(tf, event)
1259                 else:
1260                     ret, is_terminated, ret_processing_id = self.handle_update_transform(tf, event)
1261                 new_pr_ids, update_pr_ids = self.update_transform(ret)
1262 
1263                 has_update_workload_id = False
1264                 new_workload_id = ret.get('transform_parameters', {}).get('workload_id', None)
1265                 if new_workload_id and tf['workload_id'] != new_workload_id:
1266                     has_update_workload_id = True
1267                 if has_update_workload_id or is_terminated:
1268                     self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % tf['request_id'])
1269                     event = UpdateRequestEvent(publisher_id=self.id, request_id=tf['request_id'])
1270                     self.event_bus.send(event)
1271                 for pr_id in new_pr_ids:
1272                     self.logger.info(log_pre + "NewProcessingEvent(processing_id: %s)" % pr_id)
1273                     event = NewProcessingEvent(publisher_id=self.id, processing_id=pr_id)
1274                     self.event_bus.send(event)
1275                 for pr_id in update_pr_ids:
1276                     self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % pr_id)
1277                     event = UpdateProcessingEvent(publisher_id=self.id, processing_id=pr_id)
1278                     self.event_bus.send(event)
1279                 # if ret_processing_id:
1280                 #     self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % ret_processing_id)
1281                 #     event = UpdateProcessingEvent(publisher_id=self.id, processing_id=ret_processing_id)
1282                 #     self.event_bus.send(event)
1283         except Exception as ex:
1284             self.logger.error(ex)
1285             self.logger.error(traceback.format_exc())
1286             pro_ret = ReturnCode.Failed.value
1287         self.number_workers -= 1
1288         return pro_ret
1289 
1290     def handle_abort_transform(self, transform):
1291         """
1292         process abort transform
1293         """
1294         try:
1295             work = transform['transform_metadata']['work']
1296             work.set_agent_attributes(self.agent_attributes, transform)
1297 
1298             # save old status for retry
1299             oldstatus = transform['status']
1300 
1301             processing = work.get_processing(input_output_maps=[], without_creating=True)
1302             if processing and processing.processing_id:
1303                 tf_status = TransformStatus.Cancelling
1304             else:
1305                 tf_status = TransformStatus.Cancelled
1306 
1307             transform_parameters = {'status': tf_status,
1308                                     'oldstatus': oldstatus,
1309                                     'locking': TransformLocking.Idle,
1310                                     'transform_metadata': transform['transform_metadata']}
1311             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1312             return ret
1313         except Exception as ex:
1314             self.logger.error(ex)
1315             self.logger.error(traceback.format_exc())
1316             error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1317             transform_parameters = {'status': tf_status,
1318                                     'locking': TransformLocking.Idle,
1319                                     'errors': transform['errors'] if transform['errors'] else {}}
1320             transform_parameters['errors'].update(error)
1321             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1322             return ret
1323         return None
1324 
1325     def handle_abort_itransform(self, transform, event):
1326         """
1327         process abort transform
1328         """
1329         try:
1330             log_pre = self.get_log_prefix(transform)
1331 
1332             self.logger.info(log_pre + "handle_abort_itransform: %s" % transform)
1333             prs = core_processings.get_processings(transform_id=transform['transform_id'])
1334             pr_found = None
1335             for pr in prs:
1336                 if pr['processing_id'] == transform['current_processing_id']:
1337                     pr_found = pr
1338                     break
1339             if pr_found:
1340                 self.logger.info(log_pre + "AbortProcessingEvent(processing_id: %s)" % pr['processing_id'])
1341                 event = AbortProcessingEvent(publisher_id=self.id,
1342                                              processing_id=pr['processing_id'],
1343                                              content=event._content if event else None)
1344                 self.event_bus.send(event)
1345 
1346             transform_parameters = {'status': TransformStatus.Transforming,
1347                                     'substatus': TransformStatus.ToCancel,
1348                                     'locking': TransformLocking.Idle}
1349 
1350             ret = {'transform': transform,
1351                    'transform_parameters': transform_parameters}
1352             return ret
1353         except Exception as ex:
1354             self.logger.error(ex)
1355             self.logger.error(traceback.format_exc())
1356             tf_status = transform['oldstatus']
1357             error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1358             transform_parameters = {'status': tf_status,
1359                                     'locking': TransformLocking.Idle,
1360                                     'errors': transform['errors'] if transform['errors'] else {}}
1361             transform_parameters['errors'].update(error)
1362             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1363             return ret
1364         return None
1365 
1366     def process_abort_transform(self, event):
1367         self.number_workers += 1
1368         pro_ret = ReturnCode.Ok.value
1369         try:
1370             if event:
1371                 self.logger.info("process_abort_transform: event: %s" % event)
1372                 tf = self.get_transform(transform_id=event._transform_id, locking=True)
1373                 if not tf:
1374                     self.logger.error("Cannot find transform for event: %s" % str(event))
1375                     pro_ret = ReturnCode.Locked.value
1376                 else:
1377                     log_pre = self.get_log_prefix(tf)
1378                     self.logger.info(log_pre + "process_abort_transform")
1379 
1380                     if tf['status'] in [TransformStatus.Finished, TransformStatus.SubFinished,
1381                                         TransformStatus.Failed, TransformStatus.Cancelled,
1382                                         TransformStatus.Suspended, TransformStatus.Expired]:
1383                         ret = {'transform': tf,
1384                                'transform_parameters': {'locking': TransformLocking.Idle,
1385                                                         'errors': {'extra_msg': "Transform is already terminated. Cannot be aborted"}}}
1386                         if tf['errors'] and 'msg' in tf['errors']:
1387                             ret['parameters']['errors']['msg'] = tf['errors']['msg']
1388 
1389                         self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret))
1390 
1391                         self.update_transform(ret)
1392                     else:
1393                         if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
1394                             ret = self.handle_abort_itransform(tf, event)
1395                             self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret))
1396                             if ret:
1397                                 self.update_transform(ret)
1398                         else:
1399                             ret = self.handle_abort_transform(tf)
1400                             self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret))
1401                             if ret:
1402                                 self.update_transform(ret)
1403 
1404                             work = tf['transform_metadata']['work']
1405                             work.set_work_id(tf['transform_id'])
1406                             work.set_agent_attributes(self.agent_attributes, tf)
1407 
1408                             processing = work.get_processing(input_output_maps=[], without_creating=True)
1409                             if processing and processing.processing_id:
1410                                 self.logger.info(log_pre + "AbortProcessingEvent(processing_id: %s)" % processing.processing_id)
1411                                 event = AbortProcessingEvent(publisher_id=self.id, processing_id=processing.processing_id, content=event._content if event else None)
1412                                 self.event_bus.send(event)
1413         except Exception as ex:
1414             self.logger.error(ex)
1415             self.logger.error(traceback.format_exc())
1416             pro_ret = ReturnCode.Failed.value
1417         self.number_workers -= 1
1418         return pro_ret
1419 
1420     def handle_resume_transform(self, transform):
1421         """
1422         process resume transform
1423         """
1424         try:
1425             work = transform['transform_metadata']['work']
1426             work.set_agent_attributes(self.agent_attributes, transform)
1427 
1428             tf_status = transform['oldstatus']
1429 
1430             transform_parameters = {'status': tf_status,
1431                                     'locking': TransformLocking.Idle}
1432 
1433             ret = {'transform': transform,
1434                    'transform_parameters': transform_parameters}
1435             return ret
1436         except Exception as ex:
1437             self.logger.error(ex)
1438             self.logger.error(traceback.format_exc())
1439             error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1440             transform_parameters = {'status': tf_status,
1441                                     'locking': TransformLocking.Idle,
1442                                     'errors': transform['errors'] if transform['errors'] else {}}
1443             transform_parameters['errors'].update(error)
1444             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1445             return ret
1446         return None
1447 
1448     def handle_resume_itransform(self, transform, event):
1449         """
1450         process resume transform
1451         """
1452         try:
1453             log_pre = self.get_log_prefix(transform)
1454 
1455             self.logger.info(log_pre + "handle_resume_itransform: %s" % transform)
1456             prs = core_processings.get_processings(transform_id=transform['transform_id'])
1457             pr_found = None
1458             for pr in prs:
1459                 if pr['processing_id'] == transform['current_processing_id']:
1460                     pr_found = pr
1461                     break
1462 
1463             if pr_found:
1464                 self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % pr['processing_id'])
1465                 event = ResumeProcessingEvent(publisher_id=self.id,
1466                                               processing_id=pr['processing_id'],
1467                                               content=event._content if event else None)
1468                 self.event_bus.send(event)
1469 
1470             transform_parameters = {'status': TransformStatus.Transforming,
1471                                     'substatus': TransformStatus.ToResume,
1472                                     'locking': TransformLocking.Idle}
1473 
1474             ret = {'transform': transform,
1475                    'transform_parameters': transform_parameters}
1476             return ret
1477         except Exception as ex:
1478             self.logger.error(ex)
1479             self.logger.error(traceback.format_exc())
1480             tf_status = transform['oldstatus']
1481             error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1482             transform_parameters = {'status': tf_status,
1483                                     'locking': TransformLocking.Idle,
1484                                     'errors': transform['errors'] if transform['errors'] else {}}
1485             transform_parameters['errors'].update(error)
1486             ret = {'transform': transform, 'transform_parameters': transform_parameters}
1487             return ret
1488         return None
1489 
1490     def process_resume_transform(self, event):
1491         self.number_workers += 1
1492         pro_ret = ReturnCode.Ok.value
1493         try:
1494             if event:
1495                 self.logger.info("process_resume_transform: event: %s" % event)
1496                 tf = self.get_transform(transform_id=event._transform_id, locking=True)
1497                 if not tf:
1498                     self.logger.error("Cannot find transform for event: %s" % str(event))
1499                     pro_ret = ReturnCode.Locked.value
1500                 else:
1501                     log_pre = self.get_log_prefix(tf)
1502 
1503                     if tf['status'] in [TransformStatus.Finished]:
1504                         ret = {'transform': tf,
1505                                'transform_parameters': {'locking': TransformLocking.Idle,
1506                                                         'errors': {'extra_msg': "Transform is already finished. Cannot be resumed"}}}
1507                         if tf['errors'] and 'msg' in tf['errors']:
1508                             ret['parameters']['errors']['msg'] = tf['errors']['msg']
1509 
1510                         self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret))
1511                         self.update_transform(ret)
1512                     else:
1513                         if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
1514                             ret = self.handle_resume_itransform(tf, event)
1515                             self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret))
1516                             if ret:
1517                                 self.update_transform(ret)
1518                         else:
1519                             ret = self.handle_resume_transform(tf)
1520                             self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret))
1521                             if ret:
1522                                 self.update_transform(ret)
1523 
1524                             work = tf['transform_metadata']['work']
1525                             work.set_agent_attributes(self.agent_attributes, tf)
1526 
1527                             processing = work.get_processing(input_output_maps=[], without_creating=True)
1528                             if processing and processing.processing_id:
1529                                 self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % processing.processing_id)
1530                                 event = ResumeProcessingEvent(publisher_id=self.id,
1531                                                               processing_id=processing.processing_id,
1532                                                               content=event._content if event else None)
1533                                 self.event_bus.send(event)
1534                             else:
1535                                 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % tf['transform_id'])
1536                                 event = UpdateTransformEvent(publisher_id=self.id,
1537                                                              transform_id=tf['transform_id'],
1538                                                              content=event._content if event else None)
1539                                 self.event_bus.send(event)
1540         except Exception as ex:
1541             self.logger.error(ex)
1542             self.logger.error(traceback.format_exc())
1543             pro_ret = ReturnCode.Failed.value
1544         self.number_workers -= 1
1545         return pro_ret
1546 
1547     def clean_locks(self, force=False):
1548         try:
1549             self.logger.info(f"clean locking: force: {force}")
1550             health_items = self.get_health_items()
1551             min_request_id = BaseAgent.min_request_id
1552             hostname, pid, thread_id, thread_name = self.get_process_thread_info()
1553             core_transforms.clean_locking(health_items=health_items, min_request_id=min_request_id,
1554                                           time_period=self.clean_locks_time_period,
1555                                           force=force, hostname=hostname, pid=pid)
1556         except Exception as ex:
1557             self.logger.info(f"Failed clean locking: {ex}")
1558 
1559     def init_event_function_map(self):
1560         self.event_func_map = {
1561             EventType.QueueTransform: {
1562                 'pre_check': self.is_ok_to_run_more_transforms,
1563                 'exec_func': self.process_queue_transform
1564             },
1565             EventType.NewTransform: {
1566                 'pre_check': self.is_ok_to_run_more_transforms,
1567                 'exec_func': self.process_new_transform
1568             },
1569             EventType.UpdateTransform: {
1570                 'pre_check': self.is_ok_to_run_more_transforms,
1571                 'exec_func': self.process_update_transform
1572             },
1573             EventType.AbortTransform: {
1574                 'pre_check': self.is_ok_to_run_more_transforms,
1575                 'exec_func': self.process_abort_transform
1576             },
1577             EventType.ResumeTransform: {
1578                 'pre_check': self.is_ok_to_run_more_transforms,
1579                 'exec_func': self.process_resume_transform
1580             }
1581         }
1582 
1583     def run(self):
1584         """
1585         Main run function.
1586         """
1587         try:
1588             self.logger.info("Starting main thread")
1589             self.init_thread_info()
1590 
1591             self.load_plugins()
1592 
1593             self.add_default_tasks()
1594             self.clean_locks(force=True)
1595 
1596             self.init_event_function_map()
1597 
1598             task = self.create_task(task_func=self.get_queue_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
1599             self.add_task(task)
1600             task = self.create_task(task_func=self.get_new_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
1601             self.add_task(task)
1602             task = self.create_task(task_func=self.get_running_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
1603             self.add_task(task)
1604             task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
1605             self.add_task(task)
1606             task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
1607             self.add_task(task)
1608 
1609             self.execute()
1610         except KeyboardInterrupt:
1611             self.stop()
1612 
1613 
1614 if __name__ == '__main__':
1615     agent = Transformer()
1616     agent()