File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import copy
0012 import datetime
0013 import random
0014 import time
0015 import traceback
0016
0017 from idds.common import exceptions
0018 from idds.common.constants import (Sections, ReturnCode, TransformType,
0019 TransformStatus, TransformLocking,
0020 CollectionType, CollectionStatus,
0021 CollectionRelationType,
0022 ContentStatus, ContentRelationType,
0023 CommandType, ProcessingStatus, WorkflowType,
0024 ConditionStatus,
0025 get_processing_type_from_transform_type,
0026 get_transform_status_from_processing_status)
0027 from idds.common.utils import setup_logging, truncate_string
0028 from idds.core import (transforms as core_transforms,
0029 processings as core_processings,
0030 catalog as core_catalog,
0031 throttlers as core_throttlers,
0032 conditions as core_conditions)
0033 from idds.agents.common.baseagent import BaseAgent
0034 from idds.agents.common.eventbus.event import (EventType,
0035 NewTransformEvent,
0036 UpdateTransformEvent,
0037 AbortProcessingEvent,
0038 ResumeProcessingEvent,
0039 UpdateRequestEvent,
0040 NewProcessingEvent,
0041 UpdateProcessingEvent)
0042
0043 from idds.agents.common.cache.redis import get_redis_cache
0044
0045 setup_logging(__name__)
0046
0047
0048 class Transformer(BaseAgent):
0049 """
0050 Transformer works to process transforms.
0051 """
0052
0053 def __init__(self, num_threads=1, max_number_workers=8, poll_period=1800, retries=3, retrieve_bulk_size=3,
0054 use_process_pool=False, message_bulk_size=10000, **kwargs):
0055 self.max_number_workers = max_number_workers
0056 self.set_max_workers()
0057 num_threads = self.max_number_workers
0058 super(Transformer, self).__init__(num_threads=num_threads, name='Transformer', use_process_pool=use_process_pool, **kwargs)
0059 self.config_section = Sections.Transformer
0060 self.poll_period = int(poll_period)
0061 self.retries = int(retries)
0062 self.retrieve_bulk_size = int(retrieve_bulk_size)
0063 self.message_bulk_size = int(message_bulk_size)
0064
0065 if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0066 self.new_poll_period = self.poll_period
0067 else:
0068 self.new_poll_period = int(self.new_poll_period)
0069 if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0070 self.update_poll_period = self.poll_period
0071 else:
0072 self.update_poll_period = int(self.update_poll_period)
0073
0074 if not hasattr(self, 'new_poll_period') or not self.new_poll_period:
0075 self.new_poll_period = self.poll_period
0076 else:
0077 self.new_poll_period = int(self.new_poll_period)
0078 if not hasattr(self, 'update_poll_period') or not self.update_poll_period:
0079 self.update_poll_period = self.poll_period
0080 else:
0081 self.update_poll_period = int(self.update_poll_period)
0082
0083 if hasattr(self, 'poll_period_increase_rate'):
0084 self.poll_period_increase_rate = float(self.poll_period_increase_rate)
0085 else:
0086 self.poll_period_increase_rate = 2
0087
0088 if hasattr(self, 'max_new_poll_period'):
0089 self.max_new_poll_period = int(self.max_new_poll_period)
0090 else:
0091 self.max_new_poll_period = 3600 * 6
0092 if hasattr(self, 'max_update_poll_period'):
0093 self.max_update_poll_period = int(self.max_update_poll_period)
0094 else:
0095 self.max_update_poll_period = 3600 * 6
0096
0097 self.number_workers = 0
0098 if not hasattr(self, 'max_number_workers') or not self.max_number_workers:
0099 self.max_number_workers = 3
0100 else:
0101 self.max_number_workers = int(self.max_number_workers)
0102
0103 self.show_queue_size_time = None
0104
0105 if hasattr(self, 'cache_expire_seconds'):
0106 self.cache_expire_seconds = int(self.cache_expire_seconds)
0107 else:
0108 self.cache_expire_seconds = 300
0109
0110 if hasattr(self, 'clean_locks_time_period'):
0111 self.clean_locks_time_period = int(self.clean_locks_time_period)
0112 else:
0113 self.clean_locks_time_period = 1800
0114
0115 def is_ok_to_run_more_transforms(self):
0116 if self.get_num_free_workers() > 0:
0117 return True
0118 return False
0119
0120 def show_queue_size(self):
0121 if self.show_queue_size_time is None or time.time() - self.show_queue_size_time >= 600:
0122 self.show_queue_size_time = time.time()
0123 q_str = "number of transforms: %s, max number of transforms: %s" % (self.get_num_workers(), self.get_max_workers())
0124 self.logger.debug(q_str)
0125
0126 def get_bulk_size(self):
0127 return min(self.retrieve_bulk_size, self.get_num_free_workers())
0128
0129 def get_throttlers(self):
0130 """
0131 Use throttler
0132 """
0133 cache = get_redis_cache()
0134 throttlers = cache.get("throttlers", default=None)
0135 if throttlers is None:
0136 throttler_items = core_throttlers.get_throttlers()
0137 throttlers = {}
0138 for item in throttler_items:
0139 throttlers[item['site']] = {'num_requests': item['num_requests'],
0140 'num_transforms': item['num_transforms'],
0141 'num_processings': item['num_processings'],
0142 'new_contents': item['new_contents'],
0143 'queue_contents': item['queue_contents'],
0144 'others': item['others'],
0145 'status': item['status']}
0146 cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds)
0147 return throttlers
0148
0149 def get_num_active_transforms(self, site_name):
0150 cache = get_redis_cache()
0151 num_transforms = cache.get("num_transforms", default=None)
0152 if num_transforms is None:
0153 num_transforms = {}
0154 active_status = [TransformStatus.New, TransformStatus.Ready]
0155 active_status1 = [TransformStatus.Transforming, TransformStatus.Terminating]
0156 rets = core_transforms.get_num_active_transforms(active_status + active_status1)
0157 for ret in rets:
0158 status, site, count = ret
0159 if site is None:
0160 site = 'Default'
0161 if site not in num_transforms:
0162 num_transforms[site] = {'new': 0, 'processing': 0}
0163 if status in active_status:
0164 num_transforms[site]['new'] += count
0165 elif status in active_status1:
0166 num_transforms[site]['processing'] += count
0167 cache.set("num_transforms", num_transforms, expire_seconds=self.cache_expire_seconds)
0168 default_value = {'new': 0, 'processing': 0}
0169 return num_transforms.get(site_name, default_value)
0170
0171 def get_num_active_processings(self, site_name):
0172 cache = get_redis_cache()
0173 num_processings = cache.get("num_processings", default=None)
0174 active_transforms = cache.get("active_transforms", default={})
0175 if num_processings is None:
0176 num_processings = {}
0177 active_transforms = {}
0178 active_status = [ProcessingStatus.New]
0179 active_status1 = [ProcessingStatus.Submitting, ProcessingStatus.Submitted,
0180 ProcessingStatus.Running, ProcessingStatus.Terminating, ProcessingStatus.ToTrigger,
0181 ProcessingStatus.Triggering]
0182 rets = core_processings.get_active_processings(active_status + active_status1)
0183 for ret in rets:
0184 req_id, trf_id, pr_id, site, status = ret
0185 if site is None:
0186 site = 'Default'
0187 if site not in num_processings:
0188 num_processings[site] = {'new': 0, 'processing': 0}
0189 active_transforms[site] = []
0190 if status in active_status:
0191 num_processings[site]['new'] += 1
0192 elif status in active_status1:
0193 num_processings[site]['processing'] += 1
0194 active_transforms[site].append(trf_id)
0195 cache.set("num_processings", num_processings, expire_seconds=self.cache_expire_seconds)
0196 cache.set("active_transforms", active_transforms, expire_seconds=self.cache_expire_seconds)
0197 default_value = {'new': 0, 'processing': 0}
0198 return num_processings.get(site_name, default_value), active_transforms
0199
0200 def get_num_active_contents(self, site_name, active_transform_ids):
0201 cache = get_redis_cache()
0202
0203
0204 tf_id_site_map = {}
0205 all_tf_ids = []
0206 for site in active_transform_ids:
0207 all_tf_ids += active_transform_ids[site]
0208 for tf_id in active_transform_ids[site]:
0209 tf_id_site_map[tf_id] = site
0210
0211 num_input_contents = cache.get("num_input_contents", default=None)
0212 num_output_contents = cache.get("num_output_contents", default=None)
0213 if num_input_contents is None or num_output_contents is None:
0214 num_input_contents, num_output_contents = {}, {}
0215 if all_tf_ids:
0216 ret = core_catalog.get_content_status_statistics_by_relation_type(all_tf_ids)
0217 for item in ret:
0218 status, relation_type, transform_id, count = item
0219 site = tf_id_site_map[transform_id]
0220 if site not in num_input_contents:
0221 num_input_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0222 num_output_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
0223 if status in [ContentStatus.New]:
0224 if relation_type == ContentRelationType.Input:
0225 num_input_contents[site]['new'] += count
0226 elif relation_type == ContentRelationType.Output:
0227 num_output_contents[site]['new'] += count
0228 if status in [ContentStatus.Activated]:
0229 if relation_type == ContentRelationType.Input:
0230 num_input_contents[site]['activated'] += count
0231 elif relation_type == ContentRelationType.Output:
0232 num_output_contents[site]['activated'] += count
0233 else:
0234 if relation_type == ContentRelationType.Input:
0235 num_input_contents[site]['processed'] += count
0236 elif relation_type == ContentRelationType.Output:
0237 num_output_contents[site]['processed'] += count
0238
0239 cache.set("num_input_contents", num_input_contents, expire_seconds=self.cache_expire_seconds)
0240 cache.set("num_output_contents", num_output_contents, expire_seconds=self.cache_expire_seconds)
0241 default_value = {'new': 0, 'activated': 0, 'processed': 0}
0242 return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value)
0243
0244 def get_closest_site(self, task_site, throttler_sites):
0245 try:
0246 self.logger.debug(f"task_site: {task_site}, throttler_sites: {throttler_sites}")
0247 if ',' in task_site:
0248 cloud, site, queue = task_site.split(",")
0249 else:
0250
0251 site = task_site
0252 queue = None
0253
0254
0255 sorted_sites = sorted(throttler_sites, key=lambda x: (-len(x), x))
0256 for s in sorted_sites:
0257 if queue and queue.startswith(s):
0258 return s
0259 elif site and site.startswith(s):
0260 return s
0261 except Exception as ex:
0262 self.logger.warn(f"Failed to find closest site for {task_site}: {ex}")
0263 return None
0264
0265 def whether_to_throttle(self, transform):
0266 try:
0267 throttlers = self.get_throttlers()
0268
0269 site = transform['site']
0270 if site is None:
0271 site = 'Default'
0272 else:
0273 throttler_sites = [site for site in throttlers]
0274 site = self.get_closest_site(site, throttler_sites)
0275 if site is None:
0276 site = 'Default'
0277 self.logger.info(f"throttler closest site for {transform['site']} is {site}")
0278
0279 num_transforms = self.get_num_active_transforms(site)
0280 num_processings, active_transforms = self.get_num_active_processings(site)
0281 num_input_contents, num_output_contents = self.get_num_active_contents(site, active_transforms)
0282 self.logger.info("throttler(site: %s): transforms(%s), processings(%s)" % (site, num_transforms, num_processings))
0283 self.logger.info("throttler(site: %s): active input contents(%s), output contents(%s)" % (site, num_input_contents, num_output_contents))
0284
0285 throttle_transforms = throttlers.get(site, {}).get('num_transforms', None)
0286 throttle_processings = throttlers.get(site, {}).get('num_processings', None)
0287 throttle_new_jobs = throttlers.get(site, {}).get('new_contents', None)
0288 throttle_queue_jobs = throttlers.get(site, {}).get('queue_contents', None)
0289 self.logger.info("throttler(site: %s): throttle_transforms: %s, throttle_processings: %s" % (site, throttle_transforms, throttle_processings))
0290 if throttle_transforms:
0291 if num_transforms['processing'] >= throttle_transforms:
0292 self.logger.info("throttler(site: %s): num of processing transforms (%s) is bigger than throttle_transforms (%s), set throttling" % (site, num_transforms['processing'], throttle_transforms))
0293 return True
0294 if throttle_processings:
0295 if num_processings['processing'] >= throttle_processings:
0296 self.logger.info("throttler(site: %s): num of processing processings (%s) is bigger than throttle_processings (%s), set throttling" % (site, num_processings['processing'], throttle_processings))
0297 return True
0298
0299 new_jobs = num_input_contents['new']
0300 released_jobs = num_input_contents['processed']
0301 terminated_jobs = num_output_contents['processed']
0302 queue_jobs = released_jobs - terminated_jobs
0303
0304 self.logger.info("throttler(site: %s): throttle_new_jobs: %s, throttle_queue_jobs: %s" % (site, throttle_new_jobs, throttle_queue_jobs))
0305 self.logger.info("throttler(site: %s): new_jobs: %s, queue_jobs: %s" % (site, new_jobs, queue_jobs))
0306 if throttle_new_jobs:
0307 if new_jobs >= throttle_new_jobs:
0308 self.logger.info("throttler(site: %s): num of new jobs(not released) (%s) is bigger than throttle_new_jobs (%s), set throttling" % (site, new_jobs, throttle_new_jobs))
0309 return True
0310 if throttle_queue_jobs:
0311 if queue_jobs >= throttle_queue_jobs:
0312 self.logger.info("throttler(site: %s): num of queue jobs(released but not terminated) (%s) is bigger than throttle_queue_jobs (%s), set throttling" % (site, queue_jobs, throttle_queue_jobs))
0313 return True
0314
0315 return False
0316 except Exception as ex:
0317 self.logger.error("whether_to_throttle: %s" % str(ex))
0318 self.logger.error(traceback.format_exc())
0319 return False
0320
0321 def get_queue_transforms(self):
0322 """
0323 Get queue transforms to set them to new if the throttler is ok.
0324 """
0325 try:
0326 if not self.is_ok_to_run_more_transforms():
0327 return []
0328
0329 self.show_queue_size()
0330
0331 if BaseAgent.min_request_id is None:
0332 return []
0333
0334 transform_status = [TransformStatus.Queue, TransformStatus.Throttling]
0335
0336 transforms_q = core_transforms.get_transforms_by_status(status=transform_status, locking=True,
0337 not_lock=False, order_by_fifo=True,
0338 new_poll=True,
0339 min_request_id=BaseAgent.min_request_id)
0340
0341
0342 if transforms_q:
0343 self.logger.info("Main thread get queued transforms to process: %s" % str(transforms_q))
0344 for tf in transforms_q:
0345 self.process_queue_transform(transform=tf)
0346 except exceptions.DatabaseException as ex:
0347 if 'ORA-00060' in str(ex):
0348 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0349 else:
0350
0351 self.logger.error(ex)
0352 self.logger.error(traceback.format_exc())
0353 return []
0354
0355 def process_queue_transform(self, event=None, transform=None):
0356 self.number_workers += 1
0357 try:
0358 if transform is None and event:
0359 tf_status = [TransformStatus.Queue, TransformStatus.Throttling]
0360 tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True)
0361 if not tf:
0362 self.logger.warn("Cannot find transform for event: %s" % str(event))
0363 else:
0364 transform = tf
0365 if transform:
0366 tf = transform
0367 log_pre = self.get_log_prefix(tf)
0368 self.logger.info(log_pre + "process_queue_transform")
0369 to_throttle = self.whether_to_throttle(tf)
0370 transform_parameters = {'locking': TransformLocking.Idle}
0371 parameters = self.load_poll_period(tf, transform_parameters)
0372 if to_throttle:
0373 parameters['status'] = TransformStatus.Throttling
0374 core_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters)
0375 else:
0376
0377
0378
0379 parameters['status'] = TransformStatus.New
0380 core_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters)
0381
0382 self.logger.info(log_pre + "NewTransformEvent(transform_id: %s)" % str(tf['transform_id']))
0383 event = NewTransformEvent(publisher_id=self.id, transform_id=tf['transform_id'])
0384 self.event_bus.send(event)
0385 except Exception as ex:
0386 self.logger.error(ex)
0387 self.logger.error(traceback.format_exc())
0388 self.number_workers -= 1
0389
0390 def get_new_transforms(self):
0391 """
0392 Get new transforms to process
0393 """
0394 try:
0395 if not self.is_ok_to_run_more_transforms():
0396 return []
0397
0398 self.show_queue_size()
0399
0400 if BaseAgent.min_request_id is None:
0401 return []
0402
0403 transform_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend]
0404
0405 transforms_new = core_transforms.get_transforms_by_status(status=transform_status, locking=True,
0406 not_lock=True, order_by_fifo=True,
0407 new_poll=True,
0408 min_request_id=BaseAgent.min_request_id,
0409 bulk_size=self.get_bulk_size())
0410
0411
0412 if transforms_new:
0413 tf_ids = [tf['transform_id'] for tf in transforms_new]
0414 self.logger.info("Main thread get New+Ready+Extend transforms to process: %s" % str(tf_ids))
0415
0416 for tf in transforms_new:
0417 self.submit(self.process_new_transform, transform=tf)
0418
0419 return transforms_new
0420 except exceptions.DatabaseException as ex:
0421 if 'ORA-00060' in str(ex):
0422 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0423 else:
0424
0425 self.logger.error(ex)
0426 self.logger.error(traceback.format_exc())
0427 return []
0428
0429 def get_running_transforms(self):
0430 """
0431 Get running transforms
0432 """
0433 try:
0434 if not self.is_ok_to_run_more_transforms():
0435 return []
0436
0437 self.show_queue_size()
0438
0439 if BaseAgent.min_request_id is None:
0440 return []
0441
0442 transform_status = [TransformStatus.Transforming,
0443 TransformStatus.ToCancel, TransformStatus.Cancelling,
0444 TransformStatus.ToSuspend, TransformStatus.Suspending,
0445 TransformStatus.ToExpire, TransformStatus.Expiring,
0446 TransformStatus.ToResume, TransformStatus.Resuming,
0447 TransformStatus.ToFinish, TransformStatus.ToForceFinish]
0448 transforms = core_transforms.get_transforms_by_status(status=transform_status,
0449 period=None,
0450 locking=True,
0451 not_lock=True,
0452 min_request_id=BaseAgent.min_request_id,
0453 update_poll=True,
0454 bulk_size=self.get_bulk_size())
0455
0456
0457 if transforms:
0458 tf_ids = [tf['transform_id'] for tf in transforms]
0459 self.logger.info("Main thread get transforming transforms to process: %s" % str(tf_ids))
0460
0461 for tf in transforms:
0462 self.submit(self.process_update_transform, transform=tf)
0463
0464 return transforms
0465 except exceptions.DatabaseException as ex:
0466 if 'ORA-00060' in str(ex):
0467 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0468 else:
0469 self.logger.error(ex)
0470 self.logger.error(traceback.format_exc())
0471 return []
0472
0473 def get_transform(self, transform_id, status=None, locking=False):
0474 try:
0475 return core_transforms.get_transform_by_id_status(transform_id=transform_id, status=status, locking=locking)
0476 except exceptions.DatabaseException as ex:
0477 if 'ORA-00060' in str(ex):
0478 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0479 else:
0480
0481 self.logger.error(ex)
0482 self.logger.error(traceback.format_exc())
0483 return None
0484
0485 def load_poll_period(self, transform, parameters):
0486 if self.new_poll_period and transform['new_poll_period'] != self.new_poll_period:
0487 parameters['new_poll_period'] = self.new_poll_period
0488 if self.update_poll_period and transform['update_poll_period'] != self.update_poll_period:
0489 parameters['update_poll_period'] = self.update_poll_period
0490 return parameters
0491
0492 def trigger_condition(self, request_id, condition):
0493 update_condition = {}
0494 update_transforms = []
0495 cond = core_conditions.load_condition(condition)
0496 is_triggered, is_updated, ret = cond.evaluate()
0497 if is_triggered or is_updated:
0498 update_condition['condition_id'] = cond.condition_id
0499 update_condition['previous_works'] = cond.previous_works
0500
0501 if is_triggered:
0502 update_condition['condition_id'] = cond.condition_id
0503 update_condition['status'] = ConditionStatus.Triggered
0504 update_condition['evaluate_result'] = cond.result
0505
0506 triggered_works = cond.get_triggered_works(ret)
0507 if triggered_works:
0508 internal_ids = [w['internal_id'] for w in triggered_works]
0509 triggered_transforms = core_transforms.get_transforms(request_id=request_id,
0510 loop_index=condition['loop_index'],
0511 internal_ids=internal_ids)
0512 for tf in triggered_transforms:
0513 if tf['status'] in [TransformStatus.WaitForTrigger]:
0514
0515 u_transform = {'transform_id': tf['transform_id'], 'status': TransformStatus.New}
0516 update_transforms.append(u_transform)
0517 return is_triggered, is_updated, update_condition, update_transforms
0518
0519 def evaluate_conditions(self, transform):
0520 if not transform['has_conditons']:
0521 return
0522
0523 update_conditions = []
0524 all_update_triggered_transforms = []
0525 update_current_transform = None
0526
0527 loop_index = transform['loop_index']
0528 triggered_conditions = transform['triggered_conditions']
0529 untriggered_conditions = transform['untriggered_conditions']
0530
0531 new_triggered_conditions = []
0532 u_cond_ids = [u_cond['internal_id'] for u_cond in untriggered_conditions]
0533 conditions = core_conditions.get_condtions(request_id=transform['request_id'], internal_ids=u_cond_ids, loop_index=loop_index)
0534 cond_dict = {}
0535 for cond in conditions:
0536 if (loop_index is None and cond['loop_index'] is None) or (loop_index == cond['loop_index']):
0537 cond_dict[cond['internal_id']] = cond
0538 for u_cond in untriggered_conditions:
0539 cond = cond_dict[u_cond['internal_id']]
0540 if cond['status'] not in [ConditionStatus.WaitForTrigger]:
0541 ret = self.trigger_condition(request_id=transform['request_id'], condition=cond)
0542 is_triggered, is_updated, update_condition, update_triggered_transforms = ret
0543 if is_triggered or is_updated:
0544
0545
0546
0547 new_triggered_conditions.append(u_cond)
0548 if update_condition:
0549 update_conditions.append(update_condition)
0550 if update_triggered_transforms:
0551 all_update_triggered_transforms = all_update_triggered_transforms + update_triggered_transforms
0552 else:
0553 new_triggered_conditions.append(u_cond)
0554 if new_triggered_conditions:
0555 new_triggered_conditions_dict = {new_cond['internal_id']: new_cond for new_cond in new_triggered_conditions}
0556 untriggered_conditions_copy = copy.deepcopy(untriggered_conditions)
0557 untriggered_conditions = []
0558 for u_cond in untriggered_conditions_copy:
0559 if u_cond['internal_id'] in new_triggered_conditions_dict:
0560 triggered_conditions.append(u_cond)
0561 else:
0562 untriggered_conditions.append(u_cond)
0563 update_current_transform = {'transform_id': transform['transform_id'],
0564 'triggered_conditions': triggered_conditions,
0565 'untriggered_conditions': untriggered_conditions}
0566
0567 return update_current_transform, update_conditions, all_update_triggered_transforms
0568
0569 def generate_processing_model(self, transform):
0570 new_processing_model = {}
0571 new_processing_model['transform_id'] = transform['transform_id']
0572 new_processing_model['request_id'] = transform['request_id']
0573 new_processing_model['workload_id'] = transform['workload_id']
0574 new_processing_model['status'] = ProcessingStatus.New
0575
0576 new_processing_model['expired_at'] = transform['expired_at']
0577
0578 new_processing_model['loop_index'] = transform['loop_index']
0579 new_processing_model['internal_id'] = transform['internal_id']
0580 new_processing_model['parent_internal_id'] = transform['parent_internal_id']
0581
0582 new_processing_model['processing_type'] = get_processing_type_from_transform_type(transform['transform_type'])
0583 new_processing_model['new_poll_period'] = transform['new_poll_period']
0584 new_processing_model['update_poll_period'] = transform['update_poll_period']
0585 new_processing_model['max_new_retries'] = transform['max_new_retries']
0586 new_processing_model['max_update_retries'] = transform['max_update_retries']
0587 return new_processing_model
0588
0589 def get_log_prefix(self, transform):
0590 if transform:
0591 return "<request_id=%s,transform_id=%s>" % (transform['request_id'], transform['transform_id'])
0592 self.logger.error("get_log_prefix transform is empty: %s" % str(transform))
0593 return ""
0594
0595 def handle_new_transform_real(self, transform):
0596 """
0597 Process new transform
0598 """
0599 log_pre = self.get_log_prefix(transform)
0600 self.logger.info(log_pre + "handle_new_transform: transform_id: %s" % transform['transform_id'])
0601
0602 work = transform['transform_metadata']['work']
0603 work.set_work_id(transform['transform_id'])
0604 work.set_agent_attributes(self.agent_attributes, transform)
0605
0606 work_name_to_coll_map = core_transforms.get_work_name_to_coll_map(request_id=transform['request_id'])
0607 work.set_work_name_to_coll_map(work_name_to_coll_map)
0608
0609 new_processing_model = None
0610
0611 processing = work.get_processing(input_output_maps=[], without_creating=True)
0612 self.logger.debug(log_pre + "work get_processing: %s" % processing)
0613 processing_model = core_processings.get_processing(request_id=transform['request_id'], transform_id=transform['transform_id'])
0614 if processing_model:
0615 work.sync_processing(processing, processing_model)
0616 proc = processing_model['processing_metadata']['processing']
0617 work.sync_work_data(status=processing_model['status'], substatus=processing_model['substatus'],
0618 work=proc.work, output_data=processing_model['output_metadata'], processing=proc)
0619
0620 if processing_model['errors']:
0621 work.set_terminated_msg(processing_model['errors'])
0622
0623 work.set_output_data(processing.output_data)
0624 transform['workload_id'] = processing_model['workload_id']
0625 else:
0626
0627 processing = work.get_processing(input_output_maps=[], without_creating=False)
0628 self.logger.debug(log_pre + "work get_processing with creating: %s" % processing)
0629 if processing and not processing.processing_id:
0630 new_processing_model = self.generate_processing_model(transform)
0631
0632 proc_work = copy.deepcopy(work)
0633 proc_work.clean_work()
0634 processing.work = proc_work
0635 new_processing_model['processing_metadata'] = {'processing': processing}
0636
0637 transform_parameters = {'status': TransformStatus.Transforming,
0638 'locking': TransformLocking.Idle,
0639 'workload_id': transform['workload_id'],
0640 'transform_metadata': transform['transform_metadata']}
0641
0642 transform_parameters = self.load_poll_period(transform, transform_parameters)
0643
0644 if new_processing_model is not None:
0645 if 'new_poll_period' in transform_parameters:
0646 new_processing_model['new_poll_period'] = transform_parameters['new_poll_period']
0647 if 'update_poll_period' in transform_parameters:
0648 new_processing_model['update_poll_period'] = transform_parameters['update_poll_period']
0649 if 'max_new_retries' in transform_parameters:
0650 new_processing_model['max_new_retries'] = transform_parameters['max_new_retries']
0651 if 'max_update_retries' in transform_parameters:
0652 new_processing_model['max_update_retries'] = transform_parameters['max_update_retries']
0653
0654 ret = {'transform': transform,
0655 'transform_parameters': transform_parameters,
0656 'new_processing': new_processing_model
0657 }
0658 return ret
0659
0660 def handle_new_transform(self, transform, check_previous=False):
0661 """
0662 Process new transform
0663 """
0664 try:
0665 log_pre = self.get_log_prefix(transform)
0666 work = transform['transform_metadata']['work']
0667 if not check_previous:
0668 ret = self.handle_new_transform_real(transform)
0669 else:
0670 pre_works_are_ok = True
0671 if work.parent_internal_id is not None:
0672 parent_internal_ids = work.parent_internal_id.split(",")
0673 tfs = core_transforms.get_transforms(request_id=transform['request_id'], internal_ids=parent_internal_ids, loop_index=transform['loop_index'])
0674 if not tfs:
0675 pre_works_are_ok = False
0676 else:
0677 for tf in tfs:
0678 if not tf['workload_id']:
0679 pre_works_are_ok = False
0680 if pre_works_are_ok:
0681 ret = self.handle_new_transform_real(transform)
0682 else:
0683
0684 transform_parameters = {'locking': TransformLocking.Idle}
0685 ret = {'transform': transform,
0686 'transform_parameters': transform_parameters,
0687 }
0688 self.logger.info(log_pre + "handle_new_transform result: %s" % str(ret))
0689 except Exception as ex:
0690 self.logger.error(ex)
0691 self.logger.error(traceback.format_exc())
0692 retries = transform['new_retries'] + 1
0693 if not transform['max_new_retries'] or retries < transform['max_new_retries']:
0694 tf_status = transform['status']
0695 else:
0696 tf_status = TransformStatus.Failed
0697
0698
0699 new_poll_period = int(transform['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0700 if new_poll_period > self.max_new_poll_period:
0701 new_poll_period = self.max_new_poll_period
0702
0703 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0704
0705 transform_parameters = {'status': tf_status,
0706 'new_retries': retries,
0707 'new_poll_period': new_poll_period,
0708 'errors': transform['errors'] if transform['errors'] else {},
0709 'locking': TransformLocking.Idle}
0710 transform_parameters['errors'].update(error)
0711 ret = {'transform': transform, 'transform_parameters': transform_parameters}
0712 self.logger.info(log_pre + "handle_new_transform exception result: %s" % str(ret))
0713 return ret
0714
0715 def generate_collection(self, transform, collection, relation_type=CollectionRelationType.Input):
0716 coll = {'transform_id': transform['transform_id'],
0717 'request_id': transform['request_id'],
0718 'workload_id': transform['workload_id'],
0719 'coll_type': CollectionType.Dataset,
0720 'scope': collection['scope'],
0721 'name': collection['name'][:254],
0722 'relation_type': relation_type,
0723 'bytes': 0,
0724 'total_files': 0,
0725 'new_files': 0,
0726 'processed_files': 0,
0727 'processing_files': 0,
0728 'coll_metadata': None,
0729 'status': CollectionStatus.Open,
0730 'expired_at': transform['expired_at']}
0731 return coll
0732
0733 def handle_new_itransform_real(self, transform):
0734 """
0735 Process new transform
0736 """
0737 log_pre = self.get_log_prefix(transform)
0738 self.logger.info(log_pre + "handle_new_itransform: transform_id: %s" % transform['transform_id'])
0739
0740 work = transform['transform_metadata']['work']
0741 if work.workflow_type in [WorkflowType.iWork]:
0742 work.transform_id = transform['transform_id']
0743
0744
0745 new_processing_model = self.generate_processing_model(transform)
0746 new_processing_model['processing_metadata'] = {'work': work}
0747
0748 transform_parameters = {'status': TransformStatus.Transforming,
0749 'locking': TransformLocking.Idle,
0750 'workload_id': transform['workload_id']}
0751
0752 transform_parameters = self.load_poll_period(transform, transform_parameters)
0753
0754 if new_processing_model is not None:
0755 if 'new_poll_period' in transform_parameters:
0756 new_processing_model['new_poll_period'] = transform_parameters['new_poll_period']
0757 if 'update_poll_period' in transform_parameters:
0758 new_processing_model['update_poll_period'] = transform_parameters['update_poll_period']
0759 if 'max_new_retries' in transform_parameters:
0760 new_processing_model['max_new_retries'] = transform_parameters['max_new_retries']
0761 if 'max_update_retries' in transform_parameters:
0762 new_processing_model['max_update_retries'] = transform_parameters['max_update_retries']
0763
0764 func_name = work.get_func_name()
0765 func_name = func_name.split(':')[-1]
0766 input_coll = {'scope': 'pseudo_dataset', 'name': 'pseudo_input_%s' % func_name}
0767 output_coll = {'scope': 'pseudo_dataset', 'name': 'pseudo_output_%s' % func_name}
0768
0769 input_collection = self.generate_collection(transform, input_coll, relation_type=CollectionRelationType.Input)
0770 output_collection = self.generate_collection(transform, output_coll, relation_type=CollectionRelationType.Output)
0771
0772 ret = {'transform': transform,
0773 'transform_parameters': transform_parameters,
0774 'new_processing': new_processing_model,
0775 'input_collections': [input_collection],
0776 'output_collections': [output_collection]
0777 }
0778 return ret
0779
0780 def handle_new_itransform(self, transform, check_previous=False):
0781 """
0782 Process new transform
0783 """
0784 try:
0785 log_pre = self.get_log_prefix(transform)
0786 work = transform['transform_metadata']['work']
0787 pre_works_are_ok = True
0788 pre_workload_id = None
0789 if work.parent_internal_id is not None:
0790 parent_internal_ids = work.parent_internal_id.split(",")
0791 tfs = core_transforms.get_transforms(request_id=transform['request_id'], internal_ids=parent_internal_ids, loop_index=transform['loop_index'])
0792 self.logger.info(log_pre + f"handle_new_itransform parent_internal_id {work.parent_internal_id}, parent_transforms: {tfs}")
0793 if not tfs:
0794 pre_works_are_ok = False
0795 else:
0796 for tf in tfs:
0797 if not tf['workload_id']:
0798 pre_works_are_ok = False
0799 pre_workload_id = tf['workload_id']
0800 if pre_workload_id:
0801 transform['transform_metadata']['work'].parent_workload_id = pre_workload_id
0802 if pre_works_are_ok:
0803 ret = self.handle_new_itransform_real(transform)
0804 else:
0805
0806 transform_parameters = {'locking': TransformLocking.Idle}
0807 ret = {'transform': transform,
0808 'transform_parameters': transform_parameters,
0809 }
0810
0811 self.logger.info(log_pre + "handle_new_itransform result: %s" % str(ret))
0812 except Exception as ex:
0813 self.logger.error(ex)
0814 self.logger.error(traceback.format_exc())
0815 retries = transform['new_retries'] + 1
0816 if not transform['max_new_retries'] or retries < transform['max_new_retries']:
0817 tf_status = transform['status']
0818 else:
0819 tf_status = TransformStatus.Failed
0820
0821
0822 new_poll_period = int(transform['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0823 if new_poll_period > self.max_new_poll_period:
0824 new_poll_period = self.max_new_poll_period
0825
0826 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0827
0828 transform_parameters = {'status': tf_status,
0829 'new_retries': retries,
0830 'new_poll_period': new_poll_period,
0831 'errors': transform['errors'] if transform['errors'] else {},
0832 'locking': TransformLocking.Idle}
0833 transform_parameters['errors'].update(error)
0834 ret = {'transform': transform, 'transform_parameters': transform_parameters}
0835 self.logger.info(log_pre + "handle_new_itransform exception result: %s" % str(ret))
0836 return ret
0837
0838 def handle_new_generic_transform(self, transform):
0839 """
0840 Process new transform
0841 """
0842 try:
0843 log_pre = self.get_log_prefix(transform)
0844 if transform['transform_type'] in [TransformType.GenericWorkflow]:
0845 ret = self.handle_new_generic_transform_real(transform)
0846 elif transform['transform_type'] in [TransformType.GenericWork]:
0847 ret = self.handle_new_generic_transform_real(transform)
0848 self.logger.info(log_pre + "handle_new_generic_transform result: %s" % str(ret))
0849 except Exception as ex:
0850 self.logger.error(ex)
0851 self.logger.error(traceback.format_exc())
0852 retries = transform['new_retries'] + 1
0853 if not transform['max_new_retries'] or retries < transform['max_new_retries']:
0854 tf_status = transform['status']
0855 else:
0856 tf_status = TransformStatus.Failed
0857
0858
0859 new_poll_period = int(transform['new_poll_period'].total_seconds() * self.poll_period_increase_rate)
0860 if new_poll_period > self.max_new_poll_period:
0861 new_poll_period = self.max_new_poll_period
0862
0863 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
0864
0865 transform_parameters = {'status': tf_status,
0866 'new_retries': retries,
0867 'new_poll_period': new_poll_period,
0868 'errors': transform['errors'] if transform['errors'] else {},
0869 'locking': TransformLocking.Idle}
0870 transform_parameters['errors'].update(error)
0871 ret = {'transform': transform, 'transform_parameters': transform_parameters}
0872 self.logger.info(log_pre + "handle_new_generic_transform exception result: %s" % str(ret))
0873 return ret
0874
0875 def update_transform(self, ret):
0876 new_pr_ids, update_pr_ids = [], []
0877 try:
0878 if ret:
0879 log_pre = self.get_log_prefix(ret['transform'])
0880 self.logger.info(log_pre + "Update transform: %s" % str(ret))
0881
0882 ret['transform_parameters']['locking'] = TransformLocking.Idle
0883 ret['transform_parameters']['updated_at'] = datetime.datetime.utcnow()
0884
0885 retry = True
0886 retry_num = 0
0887 while retry:
0888 retry = False
0889 retry_num += 1
0890 try:
0891
0892 new_pr_ids, update_pr_ids = core_transforms.add_transform_outputs(transform=ret['transform'],
0893 transform_parameters=ret['transform_parameters'],
0894 input_collections=ret.get('input_collections', None),
0895 output_collections=ret.get('output_collections', None),
0896 log_collections=ret.get('log_collections', None),
0897 new_contents=ret.get('new_contents', None),
0898 update_input_collections=ret.get('update_input_collections', None),
0899 update_output_collections=ret.get('update_output_collections', None),
0900 update_log_collections=ret.get('update_log_collections', None),
0901 update_contents=ret.get('update_contents', None),
0902 messages=ret.get('messages', None),
0903 update_messages=ret.get('update_messages', None),
0904 new_processing=ret.get('new_processing', None),
0905 update_processing=ret.get('update_processing', None),
0906 message_bulk_size=self.message_bulk_size)
0907 except exceptions.DatabaseException as ex:
0908 if 'ORA-00060' in str(ex):
0909 self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
0910 if retry_num < 5:
0911 retry = True
0912 if retry_num <= 1:
0913 random_sleep = random.randint(1, 10)
0914 elif retry_num <= 2:
0915 random_sleep = random.randint(1, 60)
0916 else:
0917 random_sleep = random.randint(1, 120)
0918 time.sleep(random_sleep)
0919 else:
0920 raise ex
0921 else:
0922 raise ex
0923
0924
0925 except Exception as ex:
0926 self.logger.error(ex)
0927 self.logger.error(traceback.format_exc())
0928 try:
0929 transform_parameters = {'status': TransformStatus.Transforming,
0930 'locking': TransformLocking.Idle}
0931 if 'new_retries' in ret['transform_parameters']:
0932 transform_parameters['new_retries'] = ret['transform_parameters']['new_retries']
0933 if 'update_retries' in ret['transform_parameters']:
0934 transform_parameters['update_retries'] = ret['transform_parameters']['update_retries']
0935 if 'errors' in ret['transform_parameters']:
0936 transform_parameters['errors'] = ret['transform_parameters']['errors']
0937
0938 log_pre = self.get_log_prefix(ret['transform'])
0939 self.logger.warn(log_pre + "update transform exception result: %s" % str(transform_parameters))
0940
0941 new_pr_ids, update_pr_ids = core_transforms.add_transform_outputs(transform=ret['transform'],
0942 transform_parameters=transform_parameters)
0943 except Exception as ex:
0944 self.logger.error(ex)
0945 self.logger.error(traceback.format_exc())
0946 return new_pr_ids, update_pr_ids
0947
0948 def process_new_transform(self, event=None, transform=None):
0949 self.number_workers += 1
0950 try:
0951 if transform is None and event:
0952 tf_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend]
0953 tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True)
0954 if not tf:
0955 self.logger.warn("Cannot find transform for event: %s" % str(event))
0956 else:
0957 transform = tf
0958 if transform:
0959 tf = transform
0960 log_pre = self.get_log_prefix(tf)
0961 self.logger.info(log_pre + "process_new_transform")
0962 if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
0963 ret = self.handle_new_itransform(tf)
0964 elif tf['transform_type'] in [TransformType.GenericWorkflow, TransformType.GenericWork]:
0965 ret = self.handle_new_generic_transform(tf)
0966 else:
0967 ret = self.handle_new_transform(tf)
0968 self.logger.info(log_pre + "process_new_transform result: %s" % str(ret))
0969
0970 new_pr_ids, update_pr_ids = self.update_transform(ret)
0971 for pr_id in new_pr_ids:
0972 self.logger.info(log_pre + "NewProcessingEvent(processing_id: %s)" % pr_id)
0973 event = NewProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0974 self.event_bus.send(event)
0975 for pr_id in update_pr_ids:
0976 self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % pr_id)
0977 event = UpdateProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0978 self.event_bus.send(event)
0979 except Exception as ex:
0980 self.logger.error(ex)
0981 self.logger.error(traceback.format_exc())
0982 self.number_workers -= 1
0983
0984 def handle_update_transform_real(self, transform, event):
0985 """
0986 process running transforms
0987 """
0988 log_pre = self.get_log_prefix(transform)
0989
0990 self.logger.info(log_pre + "handle_update_transform: transform_id: %s" % transform['transform_id'])
0991
0992 is_terminated = False
0993 to_abort = False
0994 if (event and event._content and 'cmd_type' in event._content and event._content['cmd_type']
0995 and event._content['cmd_type'] in [CommandType.AbortRequest, CommandType.ExpireRequest]):
0996 to_abort = True
0997 self.logger.info(log_pre + "to_abort %s" % to_abort)
0998
0999 work = transform['transform_metadata']['work']
1000 work.set_work_id(transform['transform_id'])
1001 work.set_agent_attributes(self.agent_attributes, transform)
1002
1003 work_name_to_coll_map = core_transforms.get_work_name_to_coll_map(request_id=transform['request_id'])
1004 work.set_work_name_to_coll_map(work_name_to_coll_map)
1005
1006
1007 new_processing_model, processing_model = None, None
1008 ret_processing_id = None
1009
1010 processing = work.get_processing(input_output_maps=[], without_creating=True)
1011 self.logger.debug(log_pre + "work get_processing: %s" % processing)
1012 if processing and processing.processing_id:
1013 ret_processing_id = processing.processing_id
1014 processing_model = core_processings.get_processing(processing_id=processing.processing_id)
1015 work.sync_processing(processing, processing_model)
1016 proc = processing_model['processing_metadata']['processing']
1017 work.sync_work_data(status=processing_model['status'], substatus=processing_model['substatus'],
1018 work=proc.work, output_data=processing_model['output_metadata'], processing=proc)
1019
1020 if processing_model['errors']:
1021 work.set_terminated_msg(processing_model['errors'])
1022
1023 work.set_output_data(processing.output_data)
1024 transform['workload_id'] = processing_model['workload_id']
1025 else:
1026 if not processing:
1027 processing = work.get_processing(input_output_maps=[], without_creating=False)
1028 self.logger.debug(log_pre + "work get_processing with creating: %s" % processing)
1029 new_processing_model = self.generate_processing_model(transform)
1030
1031 proc_work = copy.deepcopy(work)
1032 proc_work.clean_work()
1033 processing.work = proc_work
1034 new_processing_model['processing_metadata'] = {'processing': processing}
1035
1036 self.logger.info(log_pre + "syn_work_status: %s, transform status: %s" % (transform['transform_id'], transform['status']))
1037 if work.is_terminated():
1038 is_terminated = True
1039 self.logger.info(log_pre + "Transform(%s) work is terminated: work status: %s" % (transform['transform_id'], work.get_status()))
1040 if work.is_finished():
1041 transform['status'] = TransformStatus.Finished
1042 else:
1043 if to_abort:
1044 transform['status'] = TransformStatus.Cancelled
1045 elif work.is_subfinished():
1046 transform['status'] = TransformStatus.SubFinished
1047 elif work.is_failed():
1048 transform['status'] = TransformStatus.Failed
1049 else:
1050 transform['status'] = TransformStatus.Failed
1051
1052 transform_parameters = {'status': transform['status'],
1053 'locking': TransformLocking.Idle,
1054 'workload_id': transform['workload_id'],
1055 'transform_metadata': transform['transform_metadata']}
1056 transform_parameters = self.load_poll_period(transform, transform_parameters)
1057
1058 if new_processing_model is not None:
1059 if 'new_poll_period' in transform_parameters:
1060 new_processing_model['new_poll_period'] = transform_parameters['new_poll_period']
1061 if 'update_poll_period' in transform_parameters:
1062 new_processing_model['update_poll_period'] = transform_parameters['update_poll_period']
1063 if 'max_new_retries' in transform_parameters:
1064 new_processing_model['max_new_retries'] = transform_parameters['max_new_retries']
1065 if 'max_update_retries' in transform_parameters:
1066 new_processing_model['max_update_retries'] = transform_parameters['max_update_retries']
1067
1068 ret = {'transform': transform,
1069 'transform_parameters': transform_parameters,
1070 'new_processing': new_processing_model}
1071 return ret, is_terminated, ret_processing_id
1072
1073 def handle_update_transform(self, transform, event):
1074 """
1075 Process running transform
1076 """
1077 try:
1078 log_pre = self.get_log_prefix(transform)
1079
1080 self.logger.info(log_pre + "handle_update_transform: %s" % transform)
1081 ret, is_terminated, ret_processing_id = self.handle_update_transform_real(transform, event)
1082 self.logger.info(log_pre + "handle_update_transform result: %s" % str(ret))
1083 return ret, is_terminated, ret_processing_id
1084 except Exception as ex:
1085 self.logger.error(ex)
1086 self.logger.error(traceback.format_exc())
1087
1088 retries = transform['update_retries'] + 1
1089 if not transform['max_update_retries'] or retries < transform['max_update_retries']:
1090 tf_status = transform['status']
1091 else:
1092 tf_status = TransformStatus.Failed
1093 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1094
1095
1096 update_poll_period = int(transform['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1097 if update_poll_period > self.max_update_poll_period:
1098 update_poll_period = self.max_update_poll_period
1099
1100 transform_parameters = {'status': tf_status,
1101 'update_retries': retries,
1102 'update_poll_period': update_poll_period,
1103 'errors': transform['errors'] if transform['errors'] else {},
1104 'locking': TransformLocking.Idle}
1105 transform_parameters['errors'].update(error)
1106
1107 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1108 self.logger.warn(log_pre + "handle_update_transform exception result: %s" % str(ret))
1109 return ret, False, None
1110
1111 def handle_update_itransform_real(self, transform, event):
1112 """
1113 process running transforms
1114 """
1115 log_pre = self.get_log_prefix(transform)
1116
1117 self.logger.info(log_pre + "handle_update_itransform: transform_id: %s" % transform['transform_id'])
1118
1119
1120
1121 prs = core_processings.get_processings(transform_id=transform['transform_id'])
1122 pr = None
1123 for pr in prs:
1124 if pr['processing_id'] == transform['current_processing_id']:
1125 transform['workload_id'] = pr['workload_id']
1126 break
1127
1128 errors = None
1129 if pr:
1130 transform['status'] = get_transform_status_from_processing_status(pr['status'])
1131 log_msg = log_pre + "transform id: %s, transform status: %s" % (transform['transform_id'], transform['status'])
1132 log_msg = log_msg + ", processing id: %s, processing status: %s" % (pr['processing_id'], pr['status'])
1133 self.logger.info(log_msg)
1134 else:
1135 transform['status'] = TransformStatus.Failed
1136 log_msg = log_pre + "transform id: %s, transform status: %s" % (transform['transform_id'], transform['status'])
1137 log_msg = log_msg + ", no attached processings."
1138 self.logger.error(log_msg)
1139 errors = {'submit_err': 'no attached processings'}
1140
1141 is_terminated = False
1142 if transform['status'] in [TransformStatus.Finished, TransformStatus.Failed, TransformStatus.Cancelled,
1143 TransformStatus.SubFinished, TransformStatus.Suspended, TransformStatus.Expired]:
1144 is_terminated = True
1145
1146 transform_parameters = {'status': transform['status'],
1147 'locking': TransformLocking.Idle,
1148 'workload_id': transform['workload_id']}
1149 transform_parameters = self.load_poll_period(transform, transform_parameters)
1150 if errors:
1151 transform_parameters['errors'] = errors
1152
1153 ret = {'transform': transform,
1154 'transform_parameters': transform_parameters}
1155 return ret, is_terminated, None
1156
1157 def handle_update_itransform(self, transform, event):
1158 """
1159 Process running transform
1160 """
1161 try:
1162 log_pre = self.get_log_prefix(transform)
1163
1164 self.logger.info(log_pre + "handle_update_itransform: %s" % transform)
1165 ret, is_terminated, ret_processing_id = self.handle_update_itransform_real(transform, event)
1166 self.logger.info(log_pre + "handle_update_itransform result: %s" % str(ret))
1167 return ret, is_terminated, ret_processing_id
1168 except Exception as ex:
1169 self.logger.error(ex)
1170 self.logger.error(traceback.format_exc())
1171
1172 retries = transform['update_retries'] + 1
1173 if not transform['max_update_retries'] or retries < transform['max_update_retries']:
1174 tf_status = transform['status']
1175 else:
1176 tf_status = TransformStatus.Failed
1177 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1178
1179
1180 update_poll_period = int(transform['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1181 if update_poll_period > self.max_update_poll_period:
1182 update_poll_period = self.max_update_poll_period
1183
1184 transform_parameters = {'status': tf_status,
1185 'update_retries': retries,
1186 'update_poll_period': update_poll_period,
1187 'errors': transform['errors'] if transform['errors'] else {},
1188 'locking': TransformLocking.Idle}
1189 transform_parameters['errors'].update(error)
1190
1191 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1192 self.logger.warn(log_pre + "handle_update_itransform exception result: %s" % str(ret))
1193 return ret, False, None
1194
1195 def handle_update_generic_transform(self, transform, event):
1196 """
1197 Process running transform
1198 """
1199 try:
1200 log_pre = self.get_log_prefix(transform)
1201
1202 self.logger.info(log_pre + "handle_update_generic_transform: %s" % transform)
1203 ret, is_terminated, ret_processing_id = self.handle_update_generic_transform_real(transform, event)
1204 self.logger.info(log_pre + "handle_update_generic_transform result: %s" % str(ret))
1205 return ret, is_terminated, ret_processing_id
1206 except Exception as ex:
1207 self.logger.error(ex)
1208 self.logger.error(traceback.format_exc())
1209
1210 retries = transform['update_retries'] + 1
1211 if not transform['max_update_retries'] or retries < transform['max_update_retries']:
1212 tf_status = transform['status']
1213 else:
1214 tf_status = TransformStatus.Failed
1215 error = {'submit_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1216
1217
1218 update_poll_period = int(transform['update_poll_period'].total_seconds() * self.poll_period_increase_rate)
1219 if update_poll_period > self.max_update_poll_period:
1220 update_poll_period = self.max_update_poll_period
1221
1222 transform_parameters = {'status': tf_status,
1223 'update_retries': retries,
1224 'update_poll_period': update_poll_period,
1225 'errors': transform['errors'] if transform['errors'] else {},
1226 'locking': TransformLocking.Idle}
1227 transform_parameters['errors'].update(error)
1228
1229 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1230 self.logger.warn(log_pre + "handle_update_generic_transform exception result: %s" % str(ret))
1231 return ret, False, None
1232
1233 def process_update_transform(self, event=None, transform=None):
1234 self.number_workers += 1
1235 pro_ret = ReturnCode.Ok.value
1236 try:
1237 if transform is None and event:
1238
1239
1240
1241
1242
1243
1244
1245 tf = self.get_transform(transform_id=event._transform_id, locking=True)
1246 if not tf:
1247 self.logger.error("Cannot find transform for event: %s" % str(event))
1248 pro_ret = ReturnCode.Locked.value
1249 else:
1250 transform = tf
1251 if transform:
1252 tf = transform
1253 log_pre = self.get_log_prefix(tf)
1254
1255 if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
1256 ret, is_terminated, ret_processing_id = self.handle_update_itransform(tf, event)
1257 elif tf['transform_type'] in [TransformType.GenericWorkflow, TransformType.GenericWork]:
1258 ret, is_terminated, ret_processing_id = self.handle_update_generic_transform(tf, event)
1259 else:
1260 ret, is_terminated, ret_processing_id = self.handle_update_transform(tf, event)
1261 new_pr_ids, update_pr_ids = self.update_transform(ret)
1262
1263 has_update_workload_id = False
1264 new_workload_id = ret.get('transform_parameters', {}).get('workload_id', None)
1265 if new_workload_id and tf['workload_id'] != new_workload_id:
1266 has_update_workload_id = True
1267 if has_update_workload_id or is_terminated:
1268 self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % tf['request_id'])
1269 event = UpdateRequestEvent(publisher_id=self.id, request_id=tf['request_id'])
1270 self.event_bus.send(event)
1271 for pr_id in new_pr_ids:
1272 self.logger.info(log_pre + "NewProcessingEvent(processing_id: %s)" % pr_id)
1273 event = NewProcessingEvent(publisher_id=self.id, processing_id=pr_id)
1274 self.event_bus.send(event)
1275 for pr_id in update_pr_ids:
1276 self.logger.info(log_pre + "UpdateProcessingEvent(processing_id: %s)" % pr_id)
1277 event = UpdateProcessingEvent(publisher_id=self.id, processing_id=pr_id)
1278 self.event_bus.send(event)
1279
1280
1281
1282
1283 except Exception as ex:
1284 self.logger.error(ex)
1285 self.logger.error(traceback.format_exc())
1286 pro_ret = ReturnCode.Failed.value
1287 self.number_workers -= 1
1288 return pro_ret
1289
1290 def handle_abort_transform(self, transform):
1291 """
1292 process abort transform
1293 """
1294 try:
1295 work = transform['transform_metadata']['work']
1296 work.set_agent_attributes(self.agent_attributes, transform)
1297
1298
1299 oldstatus = transform['status']
1300
1301 processing = work.get_processing(input_output_maps=[], without_creating=True)
1302 if processing and processing.processing_id:
1303 tf_status = TransformStatus.Cancelling
1304 else:
1305 tf_status = TransformStatus.Cancelled
1306
1307 transform_parameters = {'status': tf_status,
1308 'oldstatus': oldstatus,
1309 'locking': TransformLocking.Idle,
1310 'transform_metadata': transform['transform_metadata']}
1311 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1312 return ret
1313 except Exception as ex:
1314 self.logger.error(ex)
1315 self.logger.error(traceback.format_exc())
1316 error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1317 transform_parameters = {'status': tf_status,
1318 'locking': TransformLocking.Idle,
1319 'errors': transform['errors'] if transform['errors'] else {}}
1320 transform_parameters['errors'].update(error)
1321 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1322 return ret
1323 return None
1324
1325 def handle_abort_itransform(self, transform, event):
1326 """
1327 process abort transform
1328 """
1329 try:
1330 log_pre = self.get_log_prefix(transform)
1331
1332 self.logger.info(log_pre + "handle_abort_itransform: %s" % transform)
1333 prs = core_processings.get_processings(transform_id=transform['transform_id'])
1334 pr_found = None
1335 for pr in prs:
1336 if pr['processing_id'] == transform['current_processing_id']:
1337 pr_found = pr
1338 break
1339 if pr_found:
1340 self.logger.info(log_pre + "AbortProcessingEvent(processing_id: %s)" % pr['processing_id'])
1341 event = AbortProcessingEvent(publisher_id=self.id,
1342 processing_id=pr['processing_id'],
1343 content=event._content if event else None)
1344 self.event_bus.send(event)
1345
1346 transform_parameters = {'status': TransformStatus.Transforming,
1347 'substatus': TransformStatus.ToCancel,
1348 'locking': TransformLocking.Idle}
1349
1350 ret = {'transform': transform,
1351 'transform_parameters': transform_parameters}
1352 return ret
1353 except Exception as ex:
1354 self.logger.error(ex)
1355 self.logger.error(traceback.format_exc())
1356 tf_status = transform['oldstatus']
1357 error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1358 transform_parameters = {'status': tf_status,
1359 'locking': TransformLocking.Idle,
1360 'errors': transform['errors'] if transform['errors'] else {}}
1361 transform_parameters['errors'].update(error)
1362 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1363 return ret
1364 return None
1365
1366 def process_abort_transform(self, event):
1367 self.number_workers += 1
1368 pro_ret = ReturnCode.Ok.value
1369 try:
1370 if event:
1371 self.logger.info("process_abort_transform: event: %s" % event)
1372 tf = self.get_transform(transform_id=event._transform_id, locking=True)
1373 if not tf:
1374 self.logger.error("Cannot find transform for event: %s" % str(event))
1375 pro_ret = ReturnCode.Locked.value
1376 else:
1377 log_pre = self.get_log_prefix(tf)
1378 self.logger.info(log_pre + "process_abort_transform")
1379
1380 if tf['status'] in [TransformStatus.Finished, TransformStatus.SubFinished,
1381 TransformStatus.Failed, TransformStatus.Cancelled,
1382 TransformStatus.Suspended, TransformStatus.Expired]:
1383 ret = {'transform': tf,
1384 'transform_parameters': {'locking': TransformLocking.Idle,
1385 'errors': {'extra_msg': "Transform is already terminated. Cannot be aborted"}}}
1386 if tf['errors'] and 'msg' in tf['errors']:
1387 ret['parameters']['errors']['msg'] = tf['errors']['msg']
1388
1389 self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret))
1390
1391 self.update_transform(ret)
1392 else:
1393 if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
1394 ret = self.handle_abort_itransform(tf, event)
1395 self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret))
1396 if ret:
1397 self.update_transform(ret)
1398 else:
1399 ret = self.handle_abort_transform(tf)
1400 self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret))
1401 if ret:
1402 self.update_transform(ret)
1403
1404 work = tf['transform_metadata']['work']
1405 work.set_work_id(tf['transform_id'])
1406 work.set_agent_attributes(self.agent_attributes, tf)
1407
1408 processing = work.get_processing(input_output_maps=[], without_creating=True)
1409 if processing and processing.processing_id:
1410 self.logger.info(log_pre + "AbortProcessingEvent(processing_id: %s)" % processing.processing_id)
1411 event = AbortProcessingEvent(publisher_id=self.id, processing_id=processing.processing_id, content=event._content if event else None)
1412 self.event_bus.send(event)
1413 except Exception as ex:
1414 self.logger.error(ex)
1415 self.logger.error(traceback.format_exc())
1416 pro_ret = ReturnCode.Failed.value
1417 self.number_workers -= 1
1418 return pro_ret
1419
1420 def handle_resume_transform(self, transform):
1421 """
1422 process resume transform
1423 """
1424 try:
1425 work = transform['transform_metadata']['work']
1426 work.set_agent_attributes(self.agent_attributes, transform)
1427
1428 tf_status = transform['oldstatus']
1429
1430 transform_parameters = {'status': tf_status,
1431 'locking': TransformLocking.Idle}
1432
1433 ret = {'transform': transform,
1434 'transform_parameters': transform_parameters}
1435 return ret
1436 except Exception as ex:
1437 self.logger.error(ex)
1438 self.logger.error(traceback.format_exc())
1439 error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1440 transform_parameters = {'status': tf_status,
1441 'locking': TransformLocking.Idle,
1442 'errors': transform['errors'] if transform['errors'] else {}}
1443 transform_parameters['errors'].update(error)
1444 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1445 return ret
1446 return None
1447
1448 def handle_resume_itransform(self, transform, event):
1449 """
1450 process resume transform
1451 """
1452 try:
1453 log_pre = self.get_log_prefix(transform)
1454
1455 self.logger.info(log_pre + "handle_resume_itransform: %s" % transform)
1456 prs = core_processings.get_processings(transform_id=transform['transform_id'])
1457 pr_found = None
1458 for pr in prs:
1459 if pr['processing_id'] == transform['current_processing_id']:
1460 pr_found = pr
1461 break
1462
1463 if pr_found:
1464 self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % pr['processing_id'])
1465 event = ResumeProcessingEvent(publisher_id=self.id,
1466 processing_id=pr['processing_id'],
1467 content=event._content if event else None)
1468 self.event_bus.send(event)
1469
1470 transform_parameters = {'status': TransformStatus.Transforming,
1471 'substatus': TransformStatus.ToResume,
1472 'locking': TransformLocking.Idle}
1473
1474 ret = {'transform': transform,
1475 'transform_parameters': transform_parameters}
1476 return ret
1477 except Exception as ex:
1478 self.logger.error(ex)
1479 self.logger.error(traceback.format_exc())
1480 tf_status = transform['oldstatus']
1481 error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
1482 transform_parameters = {'status': tf_status,
1483 'locking': TransformLocking.Idle,
1484 'errors': transform['errors'] if transform['errors'] else {}}
1485 transform_parameters['errors'].update(error)
1486 ret = {'transform': transform, 'transform_parameters': transform_parameters}
1487 return ret
1488 return None
1489
1490 def process_resume_transform(self, event):
1491 self.number_workers += 1
1492 pro_ret = ReturnCode.Ok.value
1493 try:
1494 if event:
1495 self.logger.info("process_resume_transform: event: %s" % event)
1496 tf = self.get_transform(transform_id=event._transform_id, locking=True)
1497 if not tf:
1498 self.logger.error("Cannot find transform for event: %s" % str(event))
1499 pro_ret = ReturnCode.Locked.value
1500 else:
1501 log_pre = self.get_log_prefix(tf)
1502
1503 if tf['status'] in [TransformStatus.Finished]:
1504 ret = {'transform': tf,
1505 'transform_parameters': {'locking': TransformLocking.Idle,
1506 'errors': {'extra_msg': "Transform is already finished. Cannot be resumed"}}}
1507 if tf['errors'] and 'msg' in tf['errors']:
1508 ret['parameters']['errors']['msg'] = tf['errors']['msg']
1509
1510 self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret))
1511 self.update_transform(ret)
1512 else:
1513 if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]:
1514 ret = self.handle_resume_itransform(tf, event)
1515 self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret))
1516 if ret:
1517 self.update_transform(ret)
1518 else:
1519 ret = self.handle_resume_transform(tf)
1520 self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret))
1521 if ret:
1522 self.update_transform(ret)
1523
1524 work = tf['transform_metadata']['work']
1525 work.set_agent_attributes(self.agent_attributes, tf)
1526
1527 processing = work.get_processing(input_output_maps=[], without_creating=True)
1528 if processing and processing.processing_id:
1529 self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % processing.processing_id)
1530 event = ResumeProcessingEvent(publisher_id=self.id,
1531 processing_id=processing.processing_id,
1532 content=event._content if event else None)
1533 self.event_bus.send(event)
1534 else:
1535 self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % tf['transform_id'])
1536 event = UpdateTransformEvent(publisher_id=self.id,
1537 transform_id=tf['transform_id'],
1538 content=event._content if event else None)
1539 self.event_bus.send(event)
1540 except Exception as ex:
1541 self.logger.error(ex)
1542 self.logger.error(traceback.format_exc())
1543 pro_ret = ReturnCode.Failed.value
1544 self.number_workers -= 1
1545 return pro_ret
1546
1547 def clean_locks(self, force=False):
1548 try:
1549 self.logger.info(f"clean locking: force: {force}")
1550 health_items = self.get_health_items()
1551 min_request_id = BaseAgent.min_request_id
1552 hostname, pid, thread_id, thread_name = self.get_process_thread_info()
1553 core_transforms.clean_locking(health_items=health_items, min_request_id=min_request_id,
1554 time_period=self.clean_locks_time_period,
1555 force=force, hostname=hostname, pid=pid)
1556 except Exception as ex:
1557 self.logger.info(f"Failed clean locking: {ex}")
1558
1559 def init_event_function_map(self):
1560 self.event_func_map = {
1561 EventType.QueueTransform: {
1562 'pre_check': self.is_ok_to_run_more_transforms,
1563 'exec_func': self.process_queue_transform
1564 },
1565 EventType.NewTransform: {
1566 'pre_check': self.is_ok_to_run_more_transforms,
1567 'exec_func': self.process_new_transform
1568 },
1569 EventType.UpdateTransform: {
1570 'pre_check': self.is_ok_to_run_more_transforms,
1571 'exec_func': self.process_update_transform
1572 },
1573 EventType.AbortTransform: {
1574 'pre_check': self.is_ok_to_run_more_transforms,
1575 'exec_func': self.process_abort_transform
1576 },
1577 EventType.ResumeTransform: {
1578 'pre_check': self.is_ok_to_run_more_transforms,
1579 'exec_func': self.process_resume_transform
1580 }
1581 }
1582
1583 def run(self):
1584 """
1585 Main run function.
1586 """
1587 try:
1588 self.logger.info("Starting main thread")
1589 self.init_thread_info()
1590
1591 self.load_plugins()
1592
1593 self.add_default_tasks()
1594 self.clean_locks(force=True)
1595
1596 self.init_event_function_map()
1597
1598 task = self.create_task(task_func=self.get_queue_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
1599 self.add_task(task)
1600 task = self.create_task(task_func=self.get_new_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
1601 self.add_task(task)
1602 task = self.create_task(task_func=self.get_running_transforms, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1)
1603 self.add_task(task)
1604 task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
1605 self.add_task(task)
1606 task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
1607 self.add_task(task)
1608
1609 self.execute()
1610 except KeyboardInterrupt:
1611 self.stop()
1612
1613
1614 if __name__ == '__main__':
1615 agent = Transformer()
1616 agent()