File indexing completed on 2026-04-09 07:58:33
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import copy
0012 import datetime
0013 import json
0014 import logging
0015 import inspect
0016 import os
0017 import random
0018 import uuid
0019
0020
0021 from idds.common import exceptions
0022 from idds.common.constants import IDDSEnum, WorkStatus
0023 from idds.common.utils import json_dumps, setup_logging, get_proxy
0024 from idds.common.utils import str_to_date
0025 from .base import Base
0026 from .work import Work, Collection
0027
0028
0029 setup_logging(__name__)
0030
0031
0032 class ConditionOperator(IDDSEnum):
0033 And = 0
0034 Or = 1
0035
0036
0037 class ConditionTrigger(IDDSEnum):
0038 NotTriggered = 0
0039 ToTrigger = 1
0040 Triggered = 2
0041
0042
0043 class CompositeCondition(Base):
0044 def __init__(self, operator=ConditionOperator.And, conditions=[], true_works=None, false_works=None, logger=None):
0045 self._conditions = []
0046 self._true_works = []
0047 self._false_works = []
0048
0049 super(CompositeCondition, self).__init__()
0050
0051 self.internal_id = str(uuid.uuid4())[:8]
0052 self.template_id = self.internal_id
0053
0054
0055 self.logger = logger
0056 if self.logger is None:
0057 self.setup_logger()
0058
0059 if conditions is None:
0060 conditions = []
0061 if true_works is None:
0062 true_works = []
0063 if false_works is None:
0064 false_works = []
0065 if conditions and type(conditions) not in [tuple, list]:
0066 conditions = [conditions]
0067 if true_works and type(true_works) not in [tuple, list]:
0068 true_works = [true_works]
0069 if false_works and type(false_works) not in [tuple, list]:
0070 false_works = [false_works]
0071 self.validate_conditions(conditions)
0072
0073 self.operator = operator
0074 self.conditions = []
0075 self.true_works = []
0076 self.false_works = []
0077
0078 self.conditions = conditions
0079 self.true_works = true_works
0080 self.false_works = false_works
0081
0082 def get_class_name(self):
0083 return self.__class__.__name__
0084
0085 def get_internal_id(self):
0086 return self.internal_id
0087
0088 def get_template_id(self):
0089 return self.template_id
0090
0091 def copy(self):
0092 new_cond = copy.deepcopy(self)
0093 return new_cond
0094
0095 def __deepcopy__(self, memo):
0096 logger = self.logger
0097 self.logger = None
0098
0099 cls = self.__class__
0100 result = cls.__new__(cls)
0101
0102 memo[id(self)] = result
0103
0104
0105 for k, v in self.__dict__.items():
0106 setattr(result, k, copy.deepcopy(v, memo))
0107
0108 self.logger = logger
0109 result.logger = logger
0110 return result
0111
0112 def setup_logger(self):
0113
0114 self.logger = logging.getLogger(self.get_class_name())
0115
0116 def log_info(self, info):
0117 if self.logger is None:
0118 self.setup_logger()
0119 self.logger.info(info)
0120
0121 def log_debug(self, info):
0122 if self.logger is None:
0123 self.setup_logger()
0124 self.logger.debug(info)
0125
0126 def log_error(self, info):
0127 if self.logger is None:
0128 self.setup_logger()
0129 self.logger.error(info)
0130
0131 @property
0132 def conditions(self):
0133
0134 return self._conditions
0135
0136 @conditions.setter
0137 def conditions(self, value):
0138 self._conditions = value
0139
0140 @property
0141 def true_works(self):
0142
0143 return self._true_works
0144
0145 @true_works.setter
0146 def true_works(self, value):
0147 self._true_works = value
0148 true_work_meta = self.get_metadata_item('true_works', {})
0149 for work in value:
0150 if work is None:
0151 continue
0152 if isinstance(work, Work):
0153 if work.get_internal_id() not in true_work_meta:
0154 true_work_meta[work.get_internal_id()] = {'triggered': False}
0155 elif isinstance(work, CompositeCondition):
0156 if work.get_internal_id() not in true_work_meta:
0157 true_work_meta[work.get_internal_id()] = {'triggered': False}
0158 elif isinstance(work, Workflow):
0159 if work.get_internal_id() not in true_work_meta:
0160 true_work_meta[work.get_internal_id()] = {'triggered': False}
0161 self.add_metadata_item('true_works', true_work_meta)
0162
0163 @property
0164 def false_works(self):
0165
0166 return self._false_works
0167
0168 @false_works.setter
0169 def false_works(self, value):
0170 self._false_works = value
0171 false_work_meta = self.get_metadata_item('false_works', {})
0172 for work in value:
0173 if work is None:
0174 continue
0175 if isinstance(work, Work):
0176 if work.get_internal_id() not in false_work_meta:
0177 false_work_meta[work.get_internal_id()] = {'triggered': False}
0178 elif isinstance(work, CompositeCondition):
0179 if work.get_internal_id() not in false_work_meta:
0180 false_work_meta[work.get_internal_id()] = {'triggered': False}
0181 elif isinstance(work, Workflow):
0182 if work.get_internal_id() not in false_work_meta:
0183 false_work_meta[work.get_internal_id()] = {'triggered': False}
0184 self.add_metadata_item('false_works', false_work_meta)
0185
0186 def validate_conditions(self, conditions):
0187 if type(conditions) not in [tuple, list]:
0188 raise exceptions.IDDSException("conditions must be list")
0189 for cond in conditions:
0190 assert (inspect.ismethod(cond))
0191
0192 def add_condition(self, cond):
0193 assert (inspect.ismethod(cond))
0194 assert (isinstance(cond.__self__, Work))
0195
0196
0197 self._conditions.append(cond)
0198
0199 def load_metadata(self):
0200
0201
0202
0203 pass
0204
0205 def to_dict(self):
0206
0207 ret = {'class': self.__class__.__name__,
0208 'module': self.__class__.__module__,
0209 'attributes': {}}
0210 for key, value in self.__dict__.items():
0211
0212
0213
0214 if not key.startswith('__'):
0215 if key == 'logger':
0216 value = None
0217 elif key == '_conditions':
0218 new_value = []
0219 for cond in value:
0220 if inspect.ismethod(cond):
0221 if isinstance(cond.__self__, Work):
0222 new_cond = {'idds_method': cond.__name__,
0223 'idds_method_internal_id': cond.__self__.get_internal_id()}
0224 elif isinstance(cond.__self__, CompositeCondition):
0225 new_cond = {'idds_method': cond.__name__,
0226 'idds_method_condition': cond.__self__.to_dict()}
0227 elif isinstance(cond.__self__, Workflow):
0228 new_cond = {'idds_method': cond.__name__,
0229 'idds_method_internal_id': cond.__self__.get_internal_id()}
0230 else:
0231 new_cond = {'idds_method': cond.__name__,
0232 'idds_method_internal_id': cond.__self__.get_internal_id()}
0233 else:
0234 if hasattr(cond, '__self__'):
0235 new_cond = {'idds_attribute': cond.__name__,
0236 'idds_method_internal_id': cond.__self__.get_internal_id()}
0237 else:
0238 new_cond = cond
0239 new_value.append(new_cond)
0240 value = new_value
0241 elif key in ['_true_works', '_false_works']:
0242 new_value = []
0243 for w in value:
0244 if isinstance(w, Work):
0245 new_w = w.get_internal_id()
0246 elif isinstance(w, CompositeCondition):
0247 new_w = w.to_dict()
0248 elif isinstance(w, Workflow):
0249
0250 new_w = w.get_internal_id()
0251 else:
0252 new_w = w
0253 new_value.append(new_w)
0254 value = new_value
0255 else:
0256 value = self.to_dict_l(value)
0257 ret['attributes'][key] = value
0258 return ret
0259
0260 def get_work_from_id(self, work_id, works):
0261 return works[work_id]
0262
0263 def clean(self):
0264 for work in self.true_works:
0265 if isinstance(work, CompositeCondition):
0266 work.clean()
0267 else:
0268 true_work_meta = self.get_metadata_item('true_works', {})
0269 if work.get_internal_id() in true_work_meta:
0270 if true_work_meta[work.get_internal_id()]['triggered'] and not work.submitted:
0271 true_work_meta[work.get_internal_id()]['triggered'] = False
0272
0273 def load_conditions(self, works):
0274
0275 self.log_debug("load_conditions conditions: %s" % self.conditions)
0276 self.log_debug("load_conditions works: %s" % str(works))
0277 new_conditions = []
0278 for cond in self.conditions:
0279 if callable(cond):
0280 new_conditions.append(cond)
0281 else:
0282 if 'idds_method' in cond and 'idds_method_internal_id' in cond:
0283 self.log_debug("idds_method_internal_id: %s" % cond['idds_method_internal_id'])
0284 self.log_debug("idds_method: %s" % cond['idds_method'])
0285
0286 internal_id = cond['idds_method_internal_id']
0287 work = self.get_work_from_id(internal_id, works)
0288
0289 self.log_debug("get_work_from_id: %s: [%s]" % (internal_id, [work]))
0290
0291 if work is not None:
0292 new_cond = getattr(work, cond['idds_method'])
0293 else:
0294 self.log_error("Condition method work cannot be found for %s" % (internal_id))
0295 new_cond = cond
0296 elif 'idds_attribute' in cond and 'idds_method_internal_id' in cond:
0297 internal_id = cond['idds_method_internal_id']
0298 work = self.get_work_from_id(internal_id, works)
0299 if work is not None:
0300 new_cond = getattr(work, cond['idds_attribute'])
0301 else:
0302 self.log_error("Condition attribute work cannot be found for %s" % (internal_id))
0303 new_cond = cond
0304 elif 'idds_method' in cond and 'idds_method_condition' in cond:
0305 new_cond = cond['idds_method_condition']
0306 new_cond = getattr(new_cond, cond['idds_method'])
0307 else:
0308 new_cond = cond
0309 new_conditions.append(new_cond)
0310 self.conditions = new_conditions
0311
0312 new_true_works = []
0313 self.log_debug("true_works: %s" % str(self.true_works))
0314
0315 for w in self.true_works:
0316
0317 if isinstance(w, CompositeCondition):
0318
0319 w.load_conditions(works)
0320 work = w
0321 elif isinstance(w, Workflow):
0322 work = w
0323 elif isinstance(w, Work):
0324 work = w
0325 elif type(w) in [str]:
0326 work = self.get_work_from_id(w, works)
0327 if work is None:
0328 self.log_error("True work cannot be found for %s" % str(w))
0329 work = w
0330 else:
0331 self.log_error("True work cannot be found for type(%s): %s" % (type(w), str(w)))
0332 work = w
0333 new_true_works.append(work)
0334 self.true_works = new_true_works
0335
0336 new_false_works = []
0337 for w in self.false_works:
0338 if isinstance(w, CompositeCondition):
0339
0340 w.load_conditions(works)
0341 work = w
0342 elif isinstance(w, Workflow):
0343 work = w
0344 elif isinstance(w, Work):
0345 work = w
0346 elif type(w) in [str]:
0347 work = self.get_work_from_id(w, works)
0348 if work is None:
0349 self.log_error("False work cannot be found for type(%s): %s" % (type(w), str(w)))
0350 work = w
0351 else:
0352 self.log_error("False work cannot be found for %s" % str(w))
0353 work = w
0354 new_false_works.append(work)
0355 self.false_works = new_false_works
0356
0357 def all_works(self):
0358 works = []
0359 works = works + self.all_pre_works()
0360 works = works + self.all_next_works()
0361 return works
0362
0363 def all_condition_ids(self):
0364 works = []
0365 for cond in self.conditions:
0366 if inspect.ismethod(cond):
0367 if isinstance(cond.__self__, Work) or isinstance(cond.__self__, Workflow):
0368 works.append(cond.__self__.get_internal_id())
0369 elif isinstance(cond.__self__, CompositeCondition):
0370 works = works + cond.__self__.all_condition_ids()
0371 else:
0372 self.log_error("cond cannot be recognized: %s" % str(cond))
0373 works.append(cond)
0374 for work in self.true_works + self.false_works:
0375 if isinstance(work, CompositeCondition):
0376 works = works + work.all_condition_ids()
0377 return works
0378
0379 def all_pre_works(self):
0380 works = []
0381 for cond in self.conditions:
0382 if inspect.ismethod(cond):
0383 if isinstance(cond.__self__, Work) or isinstance(cond.__self__, Workflow):
0384 works.append(cond.__self__)
0385 elif isinstance(cond.__self__, CompositeCondition):
0386 works = works + cond.__self__.all_pre_works()
0387 else:
0388 self.log_error("cond cannot be recognized: %s" % str(cond))
0389 works.append(cond)
0390 for work in self.true_works + self.false_works:
0391 if isinstance(work, CompositeCondition):
0392 works = works + work.all_pre_works()
0393 return works
0394
0395 def all_next_works(self):
0396 works = []
0397 for work in self.true_works + self.false_works:
0398 if isinstance(work, CompositeCondition):
0399 works = works + work.all_next_works()
0400 else:
0401 works.append(work)
0402 return works
0403
0404 def get_current_cond_status(self, cond):
0405 if callable(cond):
0406 if cond():
0407 return True
0408 else:
0409 return False
0410 else:
0411 if cond:
0412 return True
0413 else:
0414 return False
0415
0416 def get_cond_status(self):
0417 if self.operator == ConditionOperator.And:
0418 for cond in self.conditions:
0419 cond_status = self.get_current_cond_status(cond)
0420
0421 if not cond_status:
0422 return False
0423 return True
0424 else:
0425 for cond in self.conditions:
0426 if self.get_current_cond_status(cond):
0427 return True
0428 return False
0429
0430 def get_condition_status(self):
0431 return self.get_cond_status()
0432
0433 def is_condition_true(self):
0434 if self.get_cond_status():
0435 return True
0436 return False
0437
0438 def is_condition_false(self):
0439 if not self.get_cond_status():
0440 return True
0441 return False
0442
0443 def get_next_works(self, trigger=ConditionTrigger.NotTriggered):
0444 works = []
0445 self.log_debug(f"get_next_works get_cond_status: {self.get_cond_status()} trigger: {trigger}")
0446 if self.get_cond_status():
0447 true_work_meta = self.get_metadata_item('true_works', {})
0448 self.log_debug(f"get_next_works true_work_meta: {true_work_meta}")
0449 for work in self.true_works:
0450 if isinstance(work, CompositeCondition):
0451 works = works + work.get_next_works(trigger=trigger)
0452 else:
0453
0454 if work.get_internal_id() not in true_work_meta:
0455 true_work_meta[work.get_internal_id()] = {'triggered': False}
0456 if trigger == ConditionTrigger.ToTrigger:
0457 if not true_work_meta[work.get_internal_id()]['triggered']:
0458 true_work_meta[work.get_internal_id()]['triggered'] = True
0459 works.append(work)
0460 elif trigger == ConditionTrigger.NotTriggered:
0461 if not true_work_meta[work.get_internal_id()]['triggered']:
0462 works.append(work)
0463 elif trigger == ConditionTrigger.Triggered:
0464 if true_work_meta[work.get_internal_id()]['triggered']:
0465 works.append(work)
0466 self.add_metadata_item('true_works', true_work_meta)
0467 else:
0468 false_work_meta = self.get_metadata_item('false_works', {})
0469 self.log_debug(f"get_next_works false_work_meta: {false_work_meta}")
0470 for work in self.false_works:
0471 if isinstance(work, CompositeCondition):
0472 works = works + work.get_next_works(trigger=trigger)
0473 else:
0474 if work.get_internal_id() not in false_work_meta:
0475 false_work_meta[work.get_internal_id()] = {'triggered': False}
0476 if trigger == ConditionTrigger.ToTrigger:
0477 if not false_work_meta[work.get_internal_id()]['triggered']:
0478 false_work_meta[work.get_internal_id()]['triggered'] = True
0479 works.append(work)
0480 elif trigger == ConditionTrigger.NotTriggered:
0481 if not false_work_meta[work.get_internal_id()]['triggered']:
0482 works.append(work)
0483 elif trigger == ConditionTrigger.Triggered:
0484 if false_work_meta[work.get_internal_id()]['triggered']:
0485 works.append(work)
0486 self.add_metadata_item('false_works', false_work_meta)
0487 return works
0488
0489
0490 class AndCondition(CompositeCondition):
0491 def __init__(self, conditions=[], true_works=None, false_works=None, logger=None):
0492 super(AndCondition, self).__init__(operator=ConditionOperator.And,
0493 conditions=conditions,
0494 true_works=true_works,
0495 false_works=false_works,
0496 logger=logger)
0497
0498
0499 class OrCondition(CompositeCondition):
0500 def __init__(self, conditions=[], true_works=None, false_works=None, logger=None):
0501 super(OrCondition, self).__init__(operator=ConditionOperator.Or,
0502 conditions=conditions,
0503 true_works=true_works,
0504 false_works=false_works,
0505 logger=logger)
0506
0507
0508 class Condition(CompositeCondition):
0509 def __init__(self, cond=None, current_work=None, true_work=None, false_work=None, logger=None):
0510 super(Condition, self).__init__(operator=ConditionOperator.And,
0511 conditions=[cond] if cond else [],
0512 true_works=[true_work] if true_work else [],
0513 false_works=[false_work] if false_work else [],
0514 logger=logger)
0515
0516
0517 @property
0518 def cond(self):
0519
0520 return self.conditions[0] if len(self.conditions) >= 1 else None
0521
0522 @cond.setter
0523 def cond(self, value):
0524 self.conditions = [value]
0525
0526 @property
0527 def true_work(self):
0528
0529 return self.true_works if len(self.true_works) >= 1 else None
0530
0531 @true_work.setter
0532 def true_work(self, value):
0533 self.true_works = [value]
0534
0535 @property
0536 def false_work(self):
0537
0538 return self.false_works if len(self.false_works) >= 1 else None
0539
0540 @false_work.setter
0541 def false_work(self, value):
0542 self.false_works = [value]
0543
0544
0545 class TemplateCondition(CompositeCondition):
0546 def __init__(self, cond=None, current_work=None, true_work=None, false_work=None, logger=None):
0547 if true_work is not None and not isinstance(true_work, Work):
0548 raise exceptions.IDDSException("true_work can only be set with Work class")
0549 if false_work is not None and not isinstance(false_work, Work):
0550 raise exceptions.IDDSException("false_work can only be set with Work class")
0551
0552 super(TemplateCondition, self).__init__(operator=ConditionOperator.And,
0553 conditions=[cond] if cond else [],
0554 true_works=[true_work] if true_work else [],
0555 false_works=[false_work] if false_work else [],
0556 logger=logger)
0557
0558 def validate_conditions(self, conditions):
0559 if type(conditions) not in [tuple, list]:
0560 raise exceptions.IDDSException("conditions must be list")
0561 if len(conditions) > 1:
0562 raise exceptions.IDDSException("Condition class can only support one condition. To support multiple condition, please use CompositeCondition.")
0563 for cond in conditions:
0564 assert (inspect.ismethod(cond))
0565 assert (isinstance(cond.__self__, Work))
0566
0567 def add_condition(self, cond):
0568 raise exceptions.IDDSException("Condition class doesn't support add_condition. To support multiple condition, please use CompositeCondition.")
0569
0570
0571 class ParameterLink(Base):
0572 def __init__(self, parameters):
0573 super(ParameterLink, self).__init__()
0574 self.parameters = {}
0575 self.num_parameters = 0
0576 if parameters:
0577 if type(parameters) not in [list, tuple]:
0578 parameters = [parameters]
0579 for p in parameters:
0580 if p:
0581 if type(p) in [str]:
0582 self.parameters[str(self.num_parameters)] = {'source': p, 'destination': p}
0583 self.num_parameters += 1
0584 elif type(p) in [dict] and 'source' in p and 'destination' in p:
0585 self.parameters[str(self.num_parameters)] = {'source': p['source'], 'destination': p['destination']}
0586 self.num_parameters += 1
0587 else:
0588 raise Exception("Cannot parse the parameters format. Accepted format: list of string or dict{'source': <>, 'destination': <>}")
0589
0590 self.internal_id = str(uuid.uuid4())[:8]
0591 self.template_id = self.internal_id
0592
0593 def get_internal_id(self):
0594 return self.internal_id
0595
0596 def get_parameter_value(self, work, p):
0597 ret = None
0598 p_f = getattr(work, p, 'None')
0599 if p_f:
0600 if callable(p_f):
0601 ret = p_f()
0602 else:
0603 ret = p_f
0604 else:
0605 ret = None
0606 if ret and type(ret) in [Collection] and hasattr(ret, 'to_origin_dict'):
0607 ret = ret.to_origin_dict()
0608 return ret
0609
0610 def set_parameters(self, work):
0611 p_values = {}
0612 for p in self.parameters:
0613 p_values[p] = self.get_parameter_value(work, self.parameters[p]['source'])
0614 self.add_metadata_item('parameters', p_values)
0615
0616 def get_parameters(self):
0617 p_values = self.get_metadata_item('parameters', {})
0618 ret = {}
0619 for p in self.parameters:
0620 if p in p_values:
0621 ret[self.parameters[p]['destination']] = p_values[p]
0622 return ret
0623
0624
0625 class WorkflowBase(Base):
0626
0627 def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
0628 """
0629 Init a workflow.
0630 """
0631 self._works = {}
0632 self._conditions = {}
0633 self._work_conds = {}
0634
0635 self.parameter_links = {}
0636 self.parameter_links_source = {}
0637 self.parameter_links_destination = {}
0638
0639 self._global_parameters = {}
0640
0641 super(WorkflowBase, self).__init__()
0642
0643 self.internal_id = str(uuid.uuid4())[:8]
0644 self.template_work_id = self.internal_id
0645
0646 self.lifetime = lifetime
0647 self.pending_time = pending_time
0648
0649 if name:
0650
0651 self._name = name
0652 else:
0653 self._name = 'idds.workflow.' + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000))
0654
0655 if workload_id is None:
0656
0657 pass
0658 self.workload_id = workload_id
0659
0660 self.logger = logger
0661 if self.logger is None:
0662 self.setup_logger()
0663
0664 self.build_work = None
0665 self._works = {}
0666 self.works = {}
0667 self.work_sequence = {}
0668
0669 self.next_works = []
0670
0671 self.terminated_works = []
0672 self.initial_works = []
0673
0674 self.primary_initial_work = None
0675 self.independent_works = []
0676
0677 self.first_initial = False
0678 self.new_to_run_works = []
0679 self.submitting_works = []
0680 self.current_running_works = []
0681
0682 self.num_subfinished_works = 0
0683 self.num_finished_works = 0
0684 self.num_failed_works = 0
0685 self.num_cancelled_works = 0
0686 self.num_suspended_works = 0
0687 self.num_expired_works = 0
0688 self.num_total_works = 0
0689
0690 self.last_work = None
0691
0692 self.last_updated_at = datetime.datetime.utcnow()
0693 self.expired = False
0694
0695 self.to_update_transforms = {}
0696
0697
0698 self.user_defined_conditions = {}
0699
0700 self.username = None
0701 self.userdn = None
0702 self.proxy = None
0703
0704 self._loop_condition_position = 'end'
0705 self.loop_condition = None
0706
0707 self.num_run = None
0708
0709 self.global_parameters = {}
0710
0711 self.to_cancel = False
0712
0713 self.synchronized = False
0714
0715 self.additional_data_storage = None
0716
0717 self.campaign = None
0718 self.campaign_scope = None
0719 self.campaign_group = None
0720 self.campaign_tag = None
0721 self.max_processing_requests = -1
0722
0723 self._request_cache = None
0724
0725 self.with_steps = False
0726
0727 self.is_workflow_step = False
0728 self.step_name = None
0729 self.workflow_data = None
0730
0731 """
0732 self._running_data_names = []
0733 for name in ['internal_id', 'template_work_id', 'workload_id', 'work_sequence', 'terminated_works',
0734 'first_initial', 'new_to_run_works', 'current_running_works',
0735 'num_subfinished_works', 'num_finished_works', 'num_failed_works', 'num_cancelled_works', 'num_suspended_works',
0736 'num_expired_works', 'num_total_works', 'last_work']:
0737 self._running_data_names.append(name)
0738 for name in ['works']:
0739 self._running_data_names.append(name)
0740 """
0741
0742 @property
0743 def name(self):
0744 return self._name
0745
0746 @name.setter
0747 def name(self, value):
0748 self._name = value
0749
0750 @property
0751 def is_workflow_step(self):
0752 return self._is_workflow_step
0753
0754 @is_workflow_step.setter
0755 def is_workflow_step(self, value):
0756 self._is_workflow_step = value
0757
0758 @property
0759 def step_name(self):
0760 return self._step_name
0761
0762 @step_name.setter
0763 def step_name(self, value):
0764 self._step_name = value
0765
0766 @property
0767 def workflow_data(self):
0768 return self._workflow_data
0769
0770 @workflow_data.setter
0771 def workflow_data(self, value):
0772 self._workflow_data = value
0773
0774 @property
0775 def campaign(self):
0776 return self._campaign
0777
0778 @campaign.setter
0779 def campaign(self, value):
0780 self._campaign = value
0781
0782 @property
0783 def campaign_scope(self):
0784 return self._campaign_scope
0785
0786 @campaign_scope.setter
0787 def campaign_scope(self, value):
0788 self._campaign_scope = value
0789
0790 @property
0791 def campaign_group(self):
0792 return self._campaign_group
0793
0794 @campaign_group.setter
0795 def campaign_group(self, value):
0796 self._campaign_group = value
0797
0798 @property
0799 def campaign_tag(self):
0800 return self._campaign_tag
0801
0802 @campaign_tag.setter
0803 def campaign_tag(self, value):
0804 self._campaign_tag = value
0805
0806 @property
0807 def request_cache(self):
0808 return self._request_cache
0809
0810 @request_cache.setter
0811 def request_cache(self, value):
0812 self._request_cache = value
0813
0814 def set_request_cache(self, value):
0815 self.request_cache = value
0816
0817 @property
0818 def max_processing_requests(self):
0819 return self._max_processing_requests
0820
0821 @max_processing_requests.setter
0822 def max_processing_requests(self, value):
0823 self._max_processing_requests = value
0824
0825 def get_template_work_id(self):
0826 return self.template_work_id
0827
0828 def get_template_id(self):
0829 return self.template_work_id
0830
0831 @property
0832 def workload_id(self):
0833 return self.get_metadata_item('workload_id')
0834
0835 @workload_id.setter
0836 def workload_id(self, value):
0837 self.add_metadata_item('workload_id', value)
0838
0839 @property
0840 def lifetime(self):
0841
0842 return getattr(self, '_lifetime', None)
0843
0844 @lifetime.setter
0845 def lifetime(self, value):
0846
0847 self._lifetime = value
0848
0849 @property
0850 def pending_time(self):
0851
0852 return getattr(self, '_pending_time', None)
0853
0854 @pending_time.setter
0855 def pending_time(self, value):
0856
0857 self._pending_time = value
0858
0859 @property
0860 def last_updated_at(self):
0861 last_updated_at = self.get_metadata_item('last_updated_at', None)
0862 if last_updated_at and type(last_updated_at) in [str]:
0863 last_updated_at = str_to_date(last_updated_at)
0864 return last_updated_at
0865
0866 @last_updated_at.setter
0867 def last_updated_at(self, value):
0868 self.add_metadata_item('last_updated_at', value)
0869
0870 def has_new_updates(self):
0871 self.last_updated_at = datetime.datetime.utcnow()
0872
0873 @property
0874 def expired(self):
0875 t = self.get_metadata_item('expired', False)
0876 if type(t) in [bool]:
0877 return t
0878 elif type(t) in [str] and t.lower() in ['true']:
0879 return True
0880 else:
0881 return False
0882
0883 @expired.setter
0884 def expired(self, value):
0885 self.add_metadata_item('expired', value)
0886
0887 @property
0888 def works(self):
0889 return self._works
0890
0891 @works.setter
0892 def works(self, value):
0893 self._works = value
0894 work_metadata = {}
0895 if self._works:
0896 for k in self._works:
0897 work = self._works[k]
0898 if isinstance(work, Workflow):
0899 work_metadata[k] = {'type': 'workflow',
0900 'metadata': work.metadata}
0901 else:
0902 work_metadata[k] = {'type': 'work',
0903 'work_id': work.work_id,
0904 'workload_id': work.workload_id,
0905 'external_id': work.external_id,
0906 'status': work.status.value if work.status else work.status,
0907 'substatus': work.substatus.value if work.substatus else work.substatus,
0908 'next_works': work.next_works,
0909 'signature': work.signature,
0910 'transforming': work.transforming}
0911 self.add_metadata_item('works', work_metadata)
0912
0913 def refresh_works(self, clean=False):
0914 work_metadata = {}
0915 if clean:
0916 self.submitting_works = []
0917 for cond_id in self.conditions:
0918 cond = self.conditions[cond_id]
0919 cond.clean()
0920
0921 if self._works:
0922 for k in self._works:
0923 work = self._works[k]
0924 if isinstance(work, Workflow):
0925 work.refresh_works()
0926 work_metadata[k] = {'type': 'workflow',
0927 'metadata': work.metadata}
0928 else:
0929 work_metadata[k] = {'type': 'work',
0930 'work_id': work.work_id,
0931 'workload_id': work.workload_id,
0932 'external_id': work.external_id,
0933 'status': work.status.value if work.status else work.status,
0934 'substatus': work.substatus.value if work.substatus else work.substatus,
0935 'next_works': work.next_works,
0936 'signature': work.signature,
0937 'transforming': work.transforming}
0938 if work.last_updated_at and (not self.last_updated_at or work.last_updated_at > self.last_updated_at):
0939 self.last_updated_at = work.last_updated_at
0940 self.add_metadata_item('works', work_metadata)
0941
0942 def load_works(self):
0943 work_metadata = self.get_metadata_item('works', {})
0944 for k in self._works:
0945 if k in work_metadata:
0946 if work_metadata[k]['type'] == 'work':
0947 self._works[k].work_id = work_metadata[k]['work_id']
0948 self._works[k].workload_id = work_metadata[k]['workload_id'] if 'workload_id' in work_metadata[k] else None
0949 self._works[k].external_id = work_metadata[k]['external_id'] if 'external_id' in work_metadata[k] else None
0950 self._works[k].transforming = work_metadata[k]['transforming']
0951 if 'signature' in work_metadata[k]:
0952 self._works[k].signature = work_metadata[k]['signature']
0953 self._works[k].status = WorkStatus(work_metadata[k]['status']) if work_metadata[k]['status'] else work_metadata[k]['status']
0954 self._works[k].substatus = WorkStatus(work_metadata[k]['substatus']) if work_metadata[k]['substatus'] else work_metadata[k]['substatus']
0955 self._works[k].next_works = work_metadata[k]['next_works'] if 'next_works' in work_metadata[k] else []
0956 elif work_metadata[k]['type'] == 'workflow':
0957 self._works[k].metadata = work_metadata[k]['metadata']
0958
0959 work = self._works[k]
0960 if work.last_updated_at and (not self.last_updated_at or work.last_updated_at > self.last_updated_at):
0961 self.last_updated_at = work.last_updated_at
0962
0963 @property
0964 def next_works(self):
0965 return self.get_metadata_item('next_works', [])
0966
0967 @next_works.setter
0968 def next_works(self, value):
0969 self.add_metadata_item('next_works', value)
0970
0971 @property
0972 def conditions(self):
0973 return self._conditions
0974
0975 @conditions.setter
0976 def conditions(self, value):
0977 self._conditions = value
0978 conditions_metadata = {}
0979 if self._conditions:
0980 for k in self._conditions:
0981 conditions_metadata[k] = self._conditions[k].metadata
0982 self.add_metadata_item('conditions', conditions_metadata)
0983
0984 @property
0985 def work_conds(self):
0986 return self._work_conds
0987
0988 @work_conds.setter
0989 def work_conds(self, value):
0990 self._work_conds = value
0991
0992
0993 def load_work_conditions(self):
0994 conditions_metadata = self.get_metadata_item('conditions', {})
0995
0996
0997 for cond_internal_id in self._conditions:
0998 if cond_internal_id in conditions_metadata:
0999 self.conditions[cond_internal_id].metadata = conditions_metadata[cond_internal_id]
1000 if isinstance(self.conditions[cond_internal_id], Workflow):
1001
1002 pass
1003 elif isinstance(self.conditions[cond_internal_id], Work):
1004
1005 pass
1006 elif isinstance(self.conditions[cond_internal_id], CompositeCondition):
1007 self.conditions[cond_internal_id].load_conditions(self.works)
1008 pass
1009
1010
1011
1012 @property
1013 def global_parameters(self):
1014 self._global_parameters = self.get_metadata_item('gp', {})
1015 return self._global_parameters
1016
1017 @global_parameters.setter
1018 def global_parameters(self, value):
1019 self._global_parameters = value
1020 gp_metadata = {}
1021 if self._global_parameters:
1022 for key in self._global_parameters:
1023 if key.startswith("user_"):
1024 gp_metadata[key] = self._global_parameters[key]
1025 else:
1026 self.logger.warn("Only parameters start with 'user_' can be set as global parameters. The parameter '%s' will be ignored." % (key))
1027 self.add_metadata_item('gp', gp_metadata)
1028
1029 def set_global_parameters(self, value):
1030 self.global_parameters = value
1031
1032 @property
1033 def sliced_global_parameters(self):
1034 self._sliced_global_parameters = self.get_metadata_item('sliced_gp', {})
1035 return self._sliced_global_parameters
1036
1037 @sliced_global_parameters.setter
1038 def sliced_global_parameters(self, value):
1039 self._sliced_global_parameters = value
1040 gp_metadata = {}
1041 if self._sliced_global_parameters:
1042 for key in self._sliced_global_parameters:
1043 if key.startswith("user_"):
1044 gp_metadata[key] = self._sliced_global_parameters[key]
1045 else:
1046 self.logger.warn("Only parameters start with 'user_' can be set as global parameters. The parameter '%s' will be ignored." % (key))
1047 self.add_metadata_item('sliced_gp', gp_metadata)
1048
1049 def set_sliced_global_parameters(self, source, name=None, index=0):
1050 sliced_global_parameters = self.sliced_global_parameters
1051 sliced_global_parameters[source] = {'name': name, 'index': index}
1052
1053 self.sliced_global_parameters = sliced_global_parameters
1054
1055 def sync_global_parameters(self, global_parameters, sliced_global_parameters=None):
1056 gp = self.global_parameters
1057 for key in global_parameters:
1058 gp[key] = global_parameters[key]
1059 self.global_parameters = gp
1060
1061 def sync_global_parameters_from_work(self, work):
1062 self.log_debug("work %s (%s) is_terminated, global_parameters: %s" % (work.get_internal_id(), str(work.metadata),
1063 str(self.global_parameters)))
1064 if isinstance(work, Work):
1065 if self.global_parameters:
1066 for key in self.global_parameters:
1067 status, value = work.get_global_parameter_from_output_data(key)
1068 self.log_debug("work %s get_global_parameter_from_output_data(key: %s) results(%s:%s)" % (work.get_internal_id(), key, status, value))
1069 if status:
1070 self.global_parameters[key] = value
1071 elif hasattr(work, key):
1072 self.global_parameters[key] = getattr(work, key)
1073 self.set_global_parameters(self.global_parameters)
1074
1075 @property
1076 def loop_condition(self):
1077 return self._loop_condition
1078
1079 @loop_condition.setter
1080 def loop_condition(self, value):
1081
1082 self._loop_condition = value
1083 if self._loop_condition:
1084 self.add_metadata_item('loop_condition', self._loop_condition.get_condition_status())
1085
1086 @property
1087 def work_sequence(self):
1088 return self.get_metadata_item('work_sequence', {})
1089
1090 @work_sequence.setter
1091 def work_sequence(self, value):
1092 self.add_metadata_item('work_sequence', value)
1093
1094 @property
1095 def terminated_works(self):
1096 return self.get_metadata_item('terminated_works', [])
1097
1098 @terminated_works.setter
1099 def terminated_works(self, value):
1100 self.add_metadata_item('terminated_works', value)
1101
1102 @property
1103 def first_initial(self):
1104 return self.get_metadata_item('first_initial', False)
1105
1106 @first_initial.setter
1107 def first_initial(self, value):
1108 self.add_metadata_item('first_initial', value)
1109
1110 @property
1111 def internal_id_relation_map(self):
1112 return self.get_metadata_item('internal_id_relation_map', False)
1113
1114 @internal_id_relation_map.setter
1115 def internal_id_relation_map(self, value):
1116 self.add_metadata_item('internal_id_relation_map', value)
1117
1118 @property
1119 def to_start_works(self):
1120 return self.get_metadata_item('to_start_works', [])
1121
1122 @to_start_works.setter
1123 def to_start_works(self, value):
1124 self.add_metadata_item('to_start_works', value)
1125
1126 @property
1127 def new_to_run_works(self):
1128 return self.get_metadata_item('new_to_run_works', [])
1129
1130 @new_to_run_works.setter
1131 def new_to_run_works(self, value):
1132 self.add_metadata_item('new_to_run_works', value)
1133
1134 @property
1135 def submitting_works(self):
1136 return self.get_metadata_item('submitting_works', [])
1137
1138 @submitting_works.setter
1139 def submitting_works(self, value):
1140 self.add_metadata_item('submitting_works', value)
1141
1142 @property
1143 def current_running_works(self):
1144 return self.get_metadata_item('current_running_works', [])
1145
1146 @current_running_works.setter
1147 def current_running_works(self, value):
1148 self.add_metadata_item('current_running_works', value)
1149
1150 @property
1151 def num_subfinished_works(self):
1152 return self.get_metadata_item('num_subfinished_works', 0)
1153
1154 @num_subfinished_works.setter
1155 def num_subfinished_works(self, value):
1156 self.add_metadata_item('num_subfinished_works', value)
1157
1158 @property
1159 def num_finished_works(self):
1160 return self.get_metadata_item('num_finished_works', 0)
1161
1162 @num_finished_works.setter
1163 def num_finished_works(self, value):
1164 self.add_metadata_item('num_finished_works', value)
1165
1166 @property
1167 def num_failed_works(self):
1168 return self.get_metadata_item('num_failed_works', 0)
1169
1170 @num_failed_works.setter
1171 def num_failed_works(self, value):
1172 self.add_metadata_item('num_failed_works', value)
1173
1174 @property
1175 def num_cancelled_works(self):
1176 return self.get_metadata_item('num_cancelled_works', 0)
1177
1178 @num_cancelled_works.setter
1179 def num_cancelled_works(self, value):
1180 self.add_metadata_item('num_cancelled_works', value)
1181
1182 @property
1183 def num_suspended_works(self):
1184 return self.get_metadata_item('num_suspended_works', 0)
1185
1186 @num_suspended_works.setter
1187 def num_suspended_works(self, value):
1188 self.add_metadata_item('num_suspended_works', value)
1189
1190 @property
1191 def num_expired_works(self):
1192 return self.get_metadata_item('num_expired_works', 0)
1193
1194 @num_expired_works.setter
1195 def num_expired_works(self, value):
1196 self.add_metadata_item('num_expired_works', value)
1197
1198 @property
1199 def num_total_works(self):
1200 return self.get_metadata_item('num_total_works', 0)
1201
1202 @num_total_works.setter
1203 def num_total_works(self, value):
1204 self.add_metadata_item('num_total_works', value)
1205
1206 @property
1207 def last_work(self):
1208 return self.get_metadata_item('last_work', None)
1209
1210 @last_work.setter
1211 def last_work(self, value):
1212 self.add_metadata_item('last_work', value)
1213
1214 @property
1215 def init_works(self):
1216 return self.get_metadata_item('init_works', [])
1217
1218 @init_works.setter
1219 def init_works(self, value):
1220 self.add_metadata_item('init_works', value)
1221
1222 @property
1223 def to_update_transforms(self):
1224 return self.get_metadata_item('to_update_transforms', {})
1225
1226 @to_update_transforms.setter
1227 def to_update_transforms(self, value):
1228 self.add_metadata_item('to_update_transforms', value)
1229
1230 @property
1231 def num_run(self):
1232 return self.get_metadata_item('num_run', 0)
1233
1234 @num_run.setter
1235 def num_run(self, value):
1236 self.add_metadata_item('num_run', value)
1237
1238 @property
1239 def parent_num_run(self):
1240 return self.get_metadata_item('parent_num_run', 0)
1241
1242 @parent_num_run.setter
1243 def parent_num_run(self, value):
1244 self.add_metadata_item('parent_num_run', value)
1245
1246 @property
1247 def to_cancel(self):
1248 return self.get_metadata_item('to_cancel', False)
1249
1250 @to_cancel.setter
1251 def to_cancel(self, value):
1252 self.add_metadata_item('to_cancel', value)
1253
1254 def is_with_steps(self):
1255 return self.with_steps
1256
1257 def set_additional_data_storage(self, storage):
1258 self.additional_data_storage = storage
1259
1260 def get_additional_data_storage(self):
1261 return self.additional_data_storage
1262
1263 def create_workflow_step(self, batches):
1264 workflow = Workflow()
1265 workflow.set_internal_id(self.get_internal_id())
1266 workflow.campaign = self.campaign
1267 workflow.campaign_scope = self.campaign_scope
1268 workflow.campaign_group = self.campaign_group
1269 workflow.campaign_tag = self.campaign_tag
1270 workflow.max_processing_requests = self.max_processing_requests
1271 workflow.name = self.name
1272 workflow.username = self.username
1273 workflow.userdn = self.userdn
1274 workflow.lifetime = self.lifetime
1275 workflow.workload_id = self.get_workload_id()
1276 workflow.is_workflow_step = True
1277
1278 data_batches = {}
1279 step_names = []
1280 for name, data_file in batches:
1281 filename = data_file['filename']
1282 with open(filename, 'r') as f:
1283 data = json.load(f)
1284 data_batches[name] = data
1285 step_names.append(name)
1286 zip_data = self.zip_data(data_batches)
1287 workflow.step_name = ",".join(step_names)
1288 workflow.workflow_data = zip_data
1289
1290 return workflow
1291
1292 def split_workflow_to_steps(self, request_cache=None, max_request_length=None):
1293 workflow = self
1294 data_length = len(json_dumps(workflow))
1295 if request_cache and max_request_length and data_length > max_request_length:
1296 logging.info(f"Workflow size {data_length} > max request length {max_request_length}, will split it into steps")
1297 storage = os.path.join(request_cache, self.get_internal_id())
1298 if not os.path.exists(storage):
1299 os.makedirs(storage, exist_ok=True)
1300 storage_name = "IDDS_WORKFLOW_ADDITIONAL_STORAGE"
1301
1302 workflow.set_additional_data_storage(storage_name)
1303 data_files = workflow.convert_data_to_additional_data_storage(storage, storage_name=storage_name)
1304 current_size = 0
1305 current_batch = []
1306 workflow_steps = []
1307 for work_name, data_file in data_files.items():
1308
1309
1310 zip_size = data_file['zip_size']
1311 if current_size + zip_size <= max_request_length:
1312 current_batch.append((work_name, data_file))
1313 current_size += zip_size
1314 else:
1315 if current_batch:
1316 workflow_step = self.create_workflow_step(current_batch)
1317 workflow_steps.append(workflow_step)
1318 current_batch = [(work_name, data_file)]
1319 current_size = zip_size
1320 if current_batch:
1321 workflow_step = self.create_workflow_step(current_batch)
1322 workflow_steps.append(workflow_step)
1323
1324 self.with_steps = True
1325
1326 return workflow_steps
1327 else:
1328 logging.info(f"Workflow size {data_length} <= max request length {max_request_length} or max request length is not defined, will not split it into steps")
1329 return []
1330
1331 def refresh(self):
1332 self.refresh_works()
1333
1334 def load_metadata(self):
1335
1336
1337
1338 self.load_works()
1339 self.load_work_conditions()
1340 self.load_parameter_links()
1341
1342 def get_class_name(self):
1343 return self.__class__.__name__
1344
1345 def setup_logger(self):
1346 """
1347 Setup logger
1348 """
1349 self.logger = logging.getLogger(self.get_class_name())
1350
1351 def log_info(self, info):
1352 if self.logger is None:
1353 self.setup_logger()
1354 self.logger.info(info)
1355
1356 def log_debug(self, info):
1357 if self.logger is None:
1358 self.setup_logger()
1359 self.logger.debug(info)
1360
1361 def set_internal_id(self, value):
1362 self.internal_id = value
1363
1364 def get_internal_id(self):
1365 return self.internal_id
1366
1367 def copy(self):
1368 new_wf = copy.deepcopy(self)
1369 return new_wf
1370
1371 def __deepcopy__(self, memo):
1372 logger = self.logger
1373 self.logger = None
1374
1375 cls = self.__class__
1376 result = cls.__new__(cls)
1377
1378 memo[id(self)] = result
1379
1380
1381 for k, v in self.__dict__.items():
1382 setattr(result, k, copy.deepcopy(v, memo))
1383
1384 self.logger = logger
1385 result.logger = logger
1386 return result
1387
1388 def get_works(self):
1389 return self.works
1390
1391 def get_combined_num_run(self):
1392 if self.parent_num_run and len(str(self.parent_num_run)):
1393 if self.num_run and len(str(self.num_run)):
1394 return str(self.parent_num_run) + "_" + str(self.num_run)
1395 else:
1396 return str(self.parent_num_run)
1397 else:
1398 if self.num_run and len(str(self.num_run)):
1399 return str(self.num_run)
1400 else:
1401 return ""
1402
1403 def get_new_work_to_run(self, work_id, new_parameters=None):
1404
1405
1406 work = self.works[work_id]
1407 work.workload_id = None
1408
1409 if isinstance(work, Workflow):
1410 work.parent_num_run = self.get_combined_num_run()
1411 work.sync_global_parameters(self.global_parameters, self.sliced_global_parameters)
1412 work.sync_works(to_cancel=self.to_cancel)
1413
1414 work.sequence_id = self.num_total_works
1415
1416 works = self.works
1417 self.works = works
1418
1419 self.work_sequence[str(self.num_total_works)] = work.get_internal_id()
1420 self.num_total_works += 1
1421
1422
1423 self.current_running_works.append(work.get_internal_id())
1424 self.last_work = work.get_internal_id()
1425 else:
1426 if work.get_internal_id() not in self.new_to_run_works:
1427 new_parameters = self.get_destination_parameters(work_id)
1428 if new_parameters:
1429 work.set_parameters(new_parameters)
1430 work.sequence_id = self.num_total_works
1431
1432 work.num_run = self.get_combined_num_run()
1433 work.initialize_work()
1434 work.sync_global_parameters(self.global_parameters, self.sliced_global_parameters)
1435 work.renew_parameters_from_attributes()
1436 if work.parent_workload_id is None and self.num_total_works > 0:
1437 last_work_id = self.work_sequence[str(self.num_total_works - 1)]
1438 last_work = self.works[last_work_id]
1439 if isinstance(last_work, Work):
1440 work.parent_workload_id = last_work.workload_id
1441 last_work.add_next_work(work.get_internal_id())
1442 works = self.works
1443 self.works = works
1444
1445 self.work_sequence[str(self.num_total_works)] = work.get_internal_id()
1446 self.num_total_works += 1
1447 self.new_to_run_works.append(work.get_internal_id())
1448 self.last_work = work.get_internal_id()
1449
1450 return work
1451
1452 def get_new_parameters_for_work(self, work):
1453 new_parameters = self.get_destination_parameters(work.get_internal_id())
1454 if new_parameters:
1455 work.set_parameters(new_parameters)
1456 work.sequence_id = self.num_total_works
1457
1458 work.initialize_work()
1459 work.sync_global_parameters(self.global_parameters, self.sliced_global_parameters)
1460 work.renew_parameters_from_attributes()
1461 works = self.works
1462 self.works = works
1463 return work
1464
1465 def register_user_defined_condition(self, condition):
1466 cond_src = inspect.getsource(condition)
1467 self.user_defined_conditions[condition.__name__] = cond_src
1468
1469 def load_user_defined_condition(self):
1470
1471
1472
1473
1474
1475
1476 for cond_src_name in self.user_defined_conditions:
1477
1478 exec(self.user_defined_conditions[cond_src_name])
1479
1480 def set_workload_id(self, workload_id):
1481 self.workload_id = workload_id
1482
1483 def get_workload_id(self):
1484 return self.workload_id
1485
1486 def get_site(self):
1487 try:
1488 work_id = self.primary_initial_work
1489 if not work_id:
1490 work_id = list(self.works.keys())[0]
1491 work = self.works[work_id]
1492 return work.get_site()
1493 except Exception:
1494 pass
1495 return None
1496
1497 def get_cloud(self):
1498 try:
1499 work_id = self.primary_initial_work
1500 if not work_id:
1501 work_id = list(self.works.keys())[0]
1502 work = self.works[work_id]
1503 return work.get_cloud()
1504 except Exception:
1505 pass
1506 return None
1507
1508 def get_queue(self):
1509 try:
1510 work_id = self.primary_initial_work
1511 if not work_id:
1512 work_id = list(self.works.keys())[0]
1513 work = self.works[work_id]
1514 return work.get_queue()
1515 except Exception:
1516 pass
1517 return None
1518
1519 def add_initial_works(self, work):
1520 self.initial_works.append(work.get_internal_id())
1521 if self.primary_initial_work is None:
1522 self.primary_initial_work = work.get_internal_id()
1523
1524 def add_work(self, work, initial=False, primary=False):
1525 self.first_initial = False
1526 self.works[work.get_internal_id()] = work
1527 if initial:
1528 if primary:
1529 self.primary_initial_work = work.get_internal_id()
1530 self.add_initial_works(work)
1531
1532 self.independent_works.append(work.get_internal_id())
1533
1534 def add_build_work(self, work, initial=False, primary=False):
1535 self.build_work = work
1536 self.build_work.set_build_work()
1537
1538 def convert_data_to_additional_data_storage(self, storage, storage_name=None, replace_storage_name=False):
1539 data_files = {}
1540 for work_id in self.works.keys():
1541 work = self.works[work_id]
1542 data_file = work.convert_data_to_additional_data_storage(storage, storage_name=storage_name, replace_storage_name=replace_storage_name)
1543 if data_file:
1544 work_name, filename, size, zip_size = data_file
1545 data_files[work_name] = {'filename': filename, 'size': size, 'zip_size': zip_size}
1546 self.works[work_id] = work
1547 return data_files
1548
1549 def get_build_work(self):
1550 return self.build_work
1551
1552 def has_to_build_work(self):
1553 if self.build_work is not None:
1554 return True
1555 return False
1556
1557 def add_condition(self, cond):
1558 self.first_initial = False
1559 cond_works = cond.all_works()
1560 for cond_work in cond_works:
1561 assert (cond_work.get_internal_id() in self.get_works())
1562
1563 conditions = self.conditions
1564 conditions[cond.get_internal_id()] = cond
1565 self.conditions = conditions
1566
1567
1568
1569
1570 work_conds = self.work_conds
1571 for work in cond.all_pre_works():
1572 if work.get_internal_id() not in work_conds:
1573 work_conds[work.get_internal_id()] = []
1574 work_conds[work.get_internal_id()].append(cond.get_internal_id())
1575 self.work_conds = work_conds
1576
1577
1578
1579 cond_next_works = cond.all_next_works()
1580 for next_work in cond_next_works:
1581 if next_work.get_internal_id() in self.independent_works:
1582 self.independent_works.remove(next_work.get_internal_id())
1583
1584 def find_workflow_from_work(self, work):
1585 if work.get_internal_id() in self._works:
1586 return self
1587 else:
1588 for k in self._works:
1589 wk = self._works[k]
1590 if isinstance(wk, Workflow):
1591 wf = wk.find_workflow_from_work(work)
1592 if wf:
1593 return wf
1594 return None
1595
1596 def add_parameter_link(self, work_source, work_destinations, parameter_link):
1597 wf_s = self.find_workflow_from_work(work_source)
1598 if not wf_s:
1599 raise Exception("Cannot find work %s in the workflow." % work_source.get_internal_id())
1600 if work_source.get_internal_id() not in wf_s.parameter_links_source:
1601 wf_s.parameter_links_source[work_source.get_internal_id()] = []
1602 wf_s.parameter_links_source[work_source.get_internal_id()].append(parameter_link.get_internal_id())
1603
1604 if type(work_destinations) not in [list, tuple]:
1605 work_destinations = [work_destinations]
1606 for work_destination in work_destinations:
1607 wf = self.find_workflow_from_work(work_destination)
1608 if not wf:
1609 raise Exception("Cannot find work %s in the workflow." % work_destination.get_internal_id())
1610 if parameter_link.get_internal_id() not in wf.parameter_links:
1611 wf.parameter_links[parameter_link.get_internal_id()] = parameter_link
1612 if work_destination.get_internal_id() not in wf.parameter_links_destination:
1613 wf.parameter_links_destination[work_destination.get_internal_id()] = []
1614 wf.parameter_links_destination[work_destination.get_internal_id()].append(parameter_link.get_internal_id())
1615
1616 def find_parameter_links_from_id(self, internal_id):
1617 rets = []
1618 if internal_id in self.parameter_links:
1619 rets.append((self, self.parameter_links[internal_id]))
1620 for k in self._works:
1621 wk = self._works[k]
1622 if isinstance(wk, Workflow):
1623 links = wk.find_parameter_links_from_id(internal_id)
1624 rets = rets + links
1625 return rets
1626
1627 def refresh_parameter_links(self):
1628 p_metadata = {}
1629 for internal_id in self.parameter_links:
1630 p_metadata[internal_id] = self.parameter_links[internal_id].metadata
1631 self.add_metadata_item('parameter_links', p_metadata)
1632
1633 def get_parameter_links_metadata(self):
1634 p_metadata = {}
1635 for internal_id in self.parameter_links:
1636 p_metadata[internal_id] = self.parameter_links[internal_id].metadata
1637 self.add_metadata_item('parameter_links', p_metadata)
1638 return p_metadata
1639
1640 def set_parameter_links_metadata(self, p_links):
1641 for internal_id in self.parameter_links:
1642 if internal_id in p_links:
1643 p_metadata = p_links[internal_id]
1644 self.parameter_links[internal_id].metadata = p_metadata
1645
1646 def set_source_parameters(self, internal_id):
1647 work = self.works[internal_id]
1648
1649
1650
1651
1652 if internal_id in self.parameter_links_source:
1653 for p_id in self.parameter_links_source[internal_id]:
1654
1655 p_links = self.find_parameter_links_from_id(p_id)
1656
1657 for wf, p_link in p_links:
1658 p_link.set_parameters(work)
1659 wf.refresh_parameter_links()
1660
1661 def get_destination_parameters(self, internal_id):
1662
1663 parameters = {}
1664 if internal_id in self.parameter_links_destination:
1665 for p_id in self.parameter_links_destination[internal_id]:
1666 p_link = self.parameter_links[p_id]
1667 parameters.update(p_link.get_parameters())
1668 return parameters
1669
1670 def load_parameter_links(self):
1671 p_metadata = self.get_metadata_item('parameter_links', {})
1672 for p_id in self.parameter_links:
1673 if p_id in p_metadata:
1674 self.parameter_links[p_id].metadata = p_metadata[p_id]
1675
1676 def add_next_work(self, work_id):
1677 next_works = self.next_works
1678 next_works.append(work_id)
1679 self.next_works = next_works
1680
1681 def enable_next_works(self, work, cond):
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692 self.log_info("Work %s condition: %s" % (work.get_internal_id(), cond.conditions))
1693 next_works = cond.get_next_works(trigger=ConditionTrigger.ToTrigger)
1694 self.log_info("Work %s condition status %s" % (work.get_internal_id(), cond.get_cond_status()))
1695 self.log_info("Work %s next works %s" % (work.get_internal_id(), str(next_works)))
1696 new_next_works = []
1697 if next_works is not None:
1698 for next_work in next_works:
1699 self.log_info(f"next work {next_work.get_internal_id()} is_submitted: {next_work.submitted}, transforming: {next_work.transforming}")
1700
1701 new_next_work = self.get_new_work_to_run(next_work.get_internal_id())
1702 work.add_next_work(new_next_work.get_internal_id())
1703 if isinstance(work, Work):
1704 new_next_work.parent_workload_id = work.workload_id
1705
1706 new_next_works.append(new_next_work)
1707 return new_next_works
1708
1709 def add_loop_condition(self, condition, position='end'):
1710 self.loop_condition_position = position
1711 self.loop_condition = condition
1712
1713 def has_loop_condition(self):
1714 if self.loop_condition:
1715 return True
1716 return False
1717
1718 def get_loop_condition_status(self):
1719 if self.has_loop_condition():
1720 self.loop_condition.load_conditions(self.works)
1721
1722 return self.loop_condition.get_condition_status()
1723 return False
1724
1725 def __str__(self):
1726 return str(json_dumps(self))
1727
1728 def format_works(self, works):
1729 if not works:
1730 return works
1731 all_works = self.get_all_works(synchronize=False)
1732 task_name_to_internal_id_map = {}
1733 for work in all_works:
1734 task_name_to_internal_id_map[work.task_name] = work.get_internal_id()
1735
1736 for work in works:
1737 parent_task_names = work.get_ancestry_works()
1738 parent_internal_ids = []
1739 if parent_task_names:
1740 own_internal_id = work.get_internal_id()
1741 parent_internal_ids = [task_name_to_internal_id_map[t_name]
1742 for t_name in parent_task_names
1743 if task_name_to_internal_id_map.get(t_name) != own_internal_id]
1744 work.parent_internal_ids = parent_internal_ids
1745 return works
1746
1747 def get_new_works(self, synchronize=True):
1748 """
1749 *** Function called by Marshaller agent.
1750
1751 new works to be ready to start
1752 """
1753 self.logger.info(f"{self.get_internal_id()} get_new_works, to_cancel: {self.to_cancel}")
1754
1755
1756
1757 if synchronize:
1758 self.sync_works(to_cancel=self.to_cancel)
1759 works = []
1760
1761 self.logger.info(f"{self.get_internal_id()} submitting_works: {self.submitting_works}")
1762 if self.submitting_works:
1763
1764 return self.format_works(works)
1765
1766 if self.to_start_works:
1767 self.logger.info("%s to_start_works: %s" % (self.get_internal_id(), str(self.to_start_works)))
1768 to_start_works = self.to_start_works.copy()
1769 init_works = self.init_works
1770 starting_works = []
1771 for work_id in to_start_works:
1772
1773
1774
1775
1776 starting_works.append(work_id)
1777 if not starting_works:
1778 work_id = to_start_works.pop(0)
1779 starting_works.append(work_id)
1780 for work_id in starting_works:
1781 self.get_new_work_to_run(work_id)
1782 if not init_works:
1783 init_works.append(work_id)
1784 self.init_works = init_works
1785 if work_id in self.to_start_works:
1786 self.to_start_works.remove(work_id)
1787 self.logger.info("%s starting_works: %s" % (self.get_internal_id(), str(starting_works)))
1788
1789
1790 self.logger.info(f"{self.get_internal_id()} new_to_run_works: {self.new_to_run_works}")
1791 for k in self.new_to_run_works:
1792 if isinstance(self.works[k], Work):
1793 self.works[k] = self.get_new_parameters_for_work(self.works[k])
1794 works.append(self.works[k])
1795 if isinstance(self.works[k], Workflow):
1796 works = works + self.works[k].get_new_works(synchronize=False)
1797 self.works[k].transforming = True
1798
1799
1800 for k in self.current_running_works:
1801 if isinstance(self.works[k], Workflow):
1802 works = works + self.works[k].get_new_works(synchronize=False)
1803
1804
1805 self.logger.info("%s get_new_works done" % self.get_internal_id())
1806
1807
1808 internal_id_relation_map = self.internal_id_relation_map
1809 self.logger.debug(f"internal_id_relation_map: {internal_id_relation_map}")
1810 if internal_id_relation_map and works:
1811 for i, w in enumerate(works):
1812 if not works[i].parent_internal_id:
1813 self.logger.debug(f"internal_id {works[i].internal_id} loads parent_internal_id {works[i].parent_internal_id}")
1814 works[i].parent_internal_id = internal_id_relation_map.get(works[i].internal_id, None)
1815 return self.format_works(works)
1816
1817 def get_current_works(self):
1818 """
1819 *** Function called by Marshaller agent.
1820
1821 Current running works
1822 """
1823 self.sync_works(to_cancel=self.to_cancel)
1824 works = []
1825 for k in self.current_running_works:
1826 if isinstance(self.works[k], Work):
1827 works.append(self.works[k])
1828 if isinstance(self.works[k], Workflow):
1829 works = works + self.works[k].get_current_works()
1830 return works
1831
1832 def get_all_works(self, synchronize=True):
1833 """
1834 *** Function called by Marshaller agent.
1835
1836 Current running works
1837 """
1838 self.logger.info("%s get_all_works" % self.get_internal_id())
1839 if synchronize:
1840 self.sync_works(to_cancel=self.to_cancel)
1841
1842 works = []
1843 for k in self.works:
1844 if isinstance(self.works[k], Work):
1845 works.append(self.works[k])
1846 if isinstance(self.works[k], Workflow):
1847 works = works + self.works[k].get_all_works(synchronize=False)
1848 self.logger.info("%s get_all_works done" % self.get_internal_id())
1849 return works
1850
1851 def get_primary_initial_collection(self):
1852 """
1853 *** Function called by Clerk agent.
1854 """
1855
1856 if self.primary_initial_work:
1857 if isinstance(self.get_works()[self.primary_initial_work], Workflow):
1858 return self.get_works()[self.primary_initial_work].get_primary_initial_collection()
1859 else:
1860 return self.get_works()[self.primary_initial_work].get_primary_input_collection()
1861 elif self.initial_works:
1862 if isinstance(self.get_works()[self.initial_works[0]], Workflow):
1863 return self.get_works()[self.initial_works[0]].get_primary_initial_collection()
1864 else:
1865 return self.get_works()[self.initial_works[0]].get_primary_input_collection()
1866 elif self.independent_works:
1867 if isinstance(self.get_works()[self.independent_works[0]], Workflow):
1868 return self.get_works()[self.independent_works[0]].get_primary_initial_collection()
1869 else:
1870 return self.get_works()[self.independent_works[0]].get_primary_input_collection()
1871 else:
1872 keys = self.get_works().keys()
1873 if isinstance(self.get_works()[keys[0]], Workflow):
1874 return self.get_works()[keys[0]].get_primary_initial_collection()
1875 else:
1876 return self.get_works()[keys[0]].get_primary_input_collection()
1877 return None
1878
1879 def get_dependency_works(self, work_id, depth, max_depth):
1880 if depth > max_depth:
1881 return []
1882
1883 deps = []
1884 for dep_work_id in self.work_dependencies[work_id]:
1885 deps.append(dep_work_id)
1886 l_deps = self.get_dependency_works(dep_work_id, depth + 1, max_depth)
1887 deps += l_deps
1888 deps = list(dict.fromkeys(deps))
1889 return deps
1890
1891 def order_independent_works(self):
1892 self.log_debug("ordering independent works")
1893 ind_work_ids = self.independent_works
1894 self.log_debug("independent works: %s" % (str(ind_work_ids)))
1895 self.independent_works = []
1896 self.work_dependencies = {}
1897
1898 all_works = self.get_all_works(synchronize=False)
1899 task_name_to_internal_id_map = {}
1900 for work in all_works:
1901 task_name_to_internal_id_map[work.task_name] = work.get_internal_id()
1902
1903 for ind_work_id in ind_work_ids:
1904 work = self.works[ind_work_id]
1905 parent_task_names = work.get_ancestry_works()
1906 self.work_dependencies[ind_work_id] = []
1907 if parent_task_names:
1908 parent_internal_ids = [task_name_to_internal_id_map[t_name] for t_name in parent_task_names]
1909 self.work_dependencies[ind_work_id] = parent_internal_ids
1910 self.log_debug('work dependencies: %s' % str(self.work_dependencies))
1911
1912 while True:
1913
1914
1915 has_changes = False
1916 for work_id in self.work_dependencies:
1917 if work_id not in self.independent_works and len(self.work_dependencies[work_id]) == 0:
1918 self.independent_works.append(work_id)
1919 has_changes = True
1920 for work_id in self.independent_works:
1921 if work_id in self.work_dependencies:
1922 del self.work_dependencies[work_id]
1923 has_changes = True
1924 for work_id in self.work_dependencies:
1925 for in_work_id in self.independent_works:
1926 if in_work_id in self.work_dependencies[work_id]:
1927 self.work_dependencies[work_id].remove(in_work_id)
1928 has_changes = True
1929 if not self.work_dependencies:
1930 break
1931 if not has_changes:
1932 self.log_debug("There are loop dependencies between works.")
1933 self.log_debug('independent_works N: %s' % str(self.independent_works))
1934 self.log_debug('work dependencies N: %s' % str(self.work_dependencies))
1935 for work_id in self.work_dependencies:
1936 if work_id not in self.independent_works:
1937 self.independent_works.append(work_id)
1938 break
1939 self.log_debug('independent_works: %s' % str(self.independent_works))
1940 self.log_debug("ordered independent works")
1941
1942 pre_work = None
1943 internal_id_relation_map = {}
1944 for work_id in self.independent_works:
1945 work = self.works[work_id]
1946 if pre_work is not None:
1947 work.parent_internal_id = pre_work.internal_id
1948 pre_work = work
1949 internal_id_relation_map[work.internal_id] = work.parent_internal_id
1950 self.internal_id_relation_map = internal_id_relation_map
1951
1952 def first_initialize(self):
1953
1954 if not self.first_initial:
1955 self.log_debug("first initializing")
1956 self.first_initial = True
1957 self.order_independent_works()
1958 if self.initial_works:
1959 tostart_works = self.initial_works
1960 elif self.independent_works:
1961 tostart_works = self.independent_works
1962 else:
1963 tostart_works = list(self.get_works().keys())
1964 tostart_works = [tostart_works[0]]
1965
1966 to_start_works = self.to_start_works
1967 for work_id in tostart_works:
1968 to_start_works.append(work_id)
1969 self.to_start_works = to_start_works
1970 self.log_debug("first initialized")
1971
1972 def sync_works(self, to_cancel=False):
1973 if to_cancel:
1974 self.to_cancel = to_cancel
1975 self.log_debug("%s num_run %s synchroning works" % (self.get_internal_id(), self.num_run))
1976 self.first_initialize()
1977
1978 self.refresh_works()
1979
1980 for k in self.works:
1981 work = self.works[k]
1982 self.log_debug("work %s is_terminated(%s:%s), is_submitted: %s, transforming: %s" % (work.get_internal_id(),
1983 work.is_terminated(synchronize=False),
1984 work.get_status(),
1985 work.submitted,
1986 work.transforming))
1987 if work.get_internal_id() not in self.current_running_works and work.get_status() in [WorkStatus.Transforming]:
1988 self.current_running_works.append(work.get_internal_id())
1989
1990 self.log_debug(f"{self.get_internal_id()} current_running_works: {self.current_running_works}")
1991 self.log_debug(f"{self.get_internal_id()} new_to_run_works: {self.new_to_run_works}")
1992 self.log_debug(f"{self.get_internal_id()} submitting_works: {self.submitting_works}")
1993 submitting_works = self.submitting_works
1994 for work in [self.works[k] for k in self.new_to_run_works]:
1995 if work.transforming:
1996 self.new_to_run_works.remove(work.get_internal_id())
1997 submitting_works.append(work.get_internal_id())
1998 if work.get_internal_id() not in self.current_running_works:
1999 self.current_running_works.append(work.get_internal_id())
2000 self.submitting_works = submitting_works
2001 for work in [self.works[k] for k in self.submitting_works]:
2002 self.log_info("Work %s is_submitted(%s)" % (work.get_internal_id(), work.submitted))
2003 if work.submitted:
2004 self.submitting_works.remove(work.get_internal_id())
2005
2006
2007 for k in self.current_running_works:
2008 work = self.works[k]
2009 if isinstance(work, Workflow):
2010 work.sync_works(to_cancel=self.to_cancel)
2011
2012 if work.is_terminated(synchronize=False):
2013 self.log_debug("work %s is_terminated, sync_global_parameters_from_work" % (work.get_internal_id()))
2014 self.set_source_parameters(work.get_internal_id())
2015 self.sync_global_parameters_from_work(work)
2016
2017 self.log_info("Work %s num_run %s is terminated(%s)" % (work.get_internal_id(), self.num_run, work.get_status()))
2018
2019 if work.get_internal_id() not in self.work_conds:
2020
2021 self.log_info("Work %s has no condition dependencies" % work.get_internal_id())
2022 if work.get_internal_id() not in self.terminated_works:
2023 self.terminated_works.append(work.get_internal_id())
2024 if work.get_internal_id() in self.current_running_works:
2025 self.current_running_works.remove(work.get_internal_id())
2026 else:
2027
2028
2029
2030
2031 if work.get_internal_id() not in self.terminated_works:
2032 self.terminated_works.append(work.get_internal_id())
2033 if work.get_internal_id() in self.current_running_works:
2034 self.current_running_works.remove(work.get_internal_id())
2035
2036 for k in self.work_conds:
2037 work = self.works[k]
2038
2039
2040 self.log_debug("Work %s has condition dependencies %s" % (work.get_internal_id(), self.work_conds[work.get_internal_id()]))
2041 for cond_id in self.work_conds[work.get_internal_id()]:
2042 cond = self.conditions[cond_id]
2043
2044
2045 self.log_debug("Work %s has condition dependencie %s" % (work.get_internal_id(), cond.get_internal_id()))
2046
2047 self.enable_next_works(work, cond)
2048
2049 self.num_finished_works = 0
2050 self.num_subfinished_works = 0
2051 self.num_failed_works = 0
2052 self.num_expired_works = 0
2053 self.num_cancelled_works = 0
2054 self.num_suspended_works = 0
2055
2056 for k in self.works:
2057 work = self.works[k]
2058 if work.is_terminated():
2059 if work.is_finished(synchronize=False):
2060 self.num_finished_works += 1
2061 elif work.is_subfinished(synchronize=False):
2062 self.num_subfinished_works += 1
2063 elif work.is_failed(synchronize=False):
2064 self.num_failed_works += 1
2065 elif work.is_expired(synchronize=False):
2066 self.num_expired_works += 1
2067 elif work.is_cancelled(synchronize=False):
2068 self.num_cancelled_works += 1
2069 elif work.is_suspended(synchronize=False):
2070 self.num_suspended_works += 1
2071
2072
2073
2074
2075
2076 log_str = "%s num_run %s num_total_works: %s" % (self.get_internal_id(), self.num_run, self.num_total_works)
2077 log_str += ", num_finished_works: %s" % self.num_finished_works
2078 log_str += ", num_subfinished_works: %s" % self.num_subfinished_works
2079 log_str += ", num_failed_works: %s" % self.num_failed_works
2080 log_str += ", num_expired_works: %s" % self.num_expired_works
2081 log_str += ", num_cancelled_works: %s" % self.num_cancelled_works
2082 log_str += ", num_suspended_works: %s" % self.num_suspended_works
2083 log_str += ", new_to_run_works: %s" % len(self.new_to_run_works)
2084 log_str += ", current_running_works: %s" % len(self.current_running_works)
2085 self.log_debug(log_str)
2086
2087 self.refresh_works()
2088 self.log_debug("%s synchronized works" % self.get_internal_id())
2089
2090 def resume_works(self):
2091 self.to_cancel = False
2092 self.num_subfinished_works = 0
2093 self.num_finished_works = 0
2094 self.num_failed_works = 0
2095 self.num_cancelled_works = 0
2096 self.num_suspended_works = 0
2097 self.num_expired_works = 0
2098
2099 self.last_updated_at = datetime.datetime.utcnow()
2100
2101 t_works = self.terminated_works
2102 self.terminated_works = []
2103 self.current_running_works = self.current_running_works + t_works
2104 for work in [self.works[k] for k in self.current_running_works]:
2105 if isinstance(work, Workflow):
2106 work.resume_works()
2107 else:
2108 work.resume_work()
2109
2110 def get_relation_data(self, work):
2111 ret = {'work': {'workload_id': work.workload_id,
2112 'external_id': work.external_id,
2113 'work_name': work.get_work_name()}}
2114 if hasattr(work, 'get_ancestry_works'):
2115 ret['work']['ancestry_works'] = work.get_ancestry_works()
2116
2117 next_works = work.next_works
2118 if next_works:
2119 next_works_data = []
2120 for next_id in next_works:
2121 next_work = self.works[next_id]
2122 if isinstance(next_work, Workflow):
2123 next_work_data = next_work.get_relation_map()
2124 else:
2125 next_work_data = self.get_relation_data(next_work)
2126 next_works_data.append(next_work_data)
2127 ret['next_works'] = next_works_data
2128 return ret
2129
2130 def organzie_based_on_ancestry_works(self, works):
2131 new_ret = []
2132
2133 ordered_items = {}
2134 left_items = []
2135 for item in works:
2136 if type(item) in [dict]:
2137 if 'ancestry_works' not in item['work'] or not item['work']['ancestry_works']:
2138 new_ret.append(item)
2139 ordered_items[item['work']['work_name']] = item
2140 else:
2141
2142 left_items.append(item)
2143 elif type(item) in [list]:
2144
2145
2146
2147
2148
2149
2150 new_ret.append(item)
2151 while True:
2152 new_left_items = left_items
2153 left_items = []
2154 has_updates = False
2155 for item in new_left_items:
2156 ancestry_works = item['work']['ancestry_works']
2157 all_ancestry_ready = True
2158 for work_name in ancestry_works:
2159 if work_name not in ordered_items and work_name != item['work']['work_name']:
2160 all_ancestry_ready = False
2161 if all_ancestry_ready:
2162 for work_name in ancestry_works:
2163 if work_name != item['work']['work_name']:
2164 if 'next_works' not in ordered_items[work_name]:
2165 ordered_items[work_name]['next_works'] = [item]
2166 else:
2167 ordered_items[work_name]['next_works'].append(item)
2168 has_updates = True
2169 ordered_items[item['work']['work_name']] = item
2170 else:
2171 left_items.append(item)
2172 if not has_updates or not left_items:
2173 break
2174 for item in left_items:
2175 new_ret.append(item)
2176 return new_ret
2177
2178 def get_relation_map(self):
2179 ret = []
2180 init_works = self.init_works
2181 for internal_id in init_works:
2182 if isinstance(self.works[internal_id], Workflow):
2183 work_data = self.works[internal_id].get_relation_map()
2184 else:
2185 work_data = self.get_relation_data(self.works[internal_id])
2186 ret.append(work_data)
2187 ret = self.organzie_based_on_ancestry_works(ret)
2188 return ret
2189
2190 def clean_works(self):
2191 self.num_subfinished_works = 0
2192 self.num_finished_works = 0
2193 self.num_failed_works = 0
2194 self.num_cancelled_works = 0
2195 self.num_suspended_works = 0
2196 self.num_expired_works = 0
2197 self.num_total_works = 0
2198
2199 self.last_updated_at = datetime.datetime.utcnow()
2200 if self.build_work:
2201 self.build_work.clean_works()
2202
2203 self.terminated_works = []
2204 self.current_running_works = []
2205
2206
2207
2208 self.first_initial = False
2209 self.new_to_run_works = []
2210
2211 def get_exact_workflows(self):
2212 """
2213 *** Function called by Clerk agent.
2214
2215 TODO: The primary dataset for the initial work is a dataset with '*'.
2216 workflow.primary_initial_collection = 'some datasets with *'
2217 collections = get_collection(workflow.primary_initial_collection)
2218 wfs = []
2219 for coll in collections:
2220 wf = self.copy()
2221 wf.name = self.name + "_" + number
2222 wf.primary_initial_collection = coll
2223 wfs.append(wf)
2224 return wfs
2225 """
2226 return [self]
2227
2228 def is_terminated(self, synchronize=True, new=False):
2229 """
2230 *** Function called by Marshaller agent.
2231 """
2232 if synchronize:
2233 self.sync_works(to_cancel=self.to_cancel)
2234 if self.to_cancel:
2235 if new:
2236 if len(self.new_to_run_works) == 0 and len(self.current_running_works) == 0 and self.num_total_works > 0:
2237 return True
2238 else:
2239 if len(self.current_running_works) == 0:
2240 return True
2241 else:
2242 num_total_works = self.num_finished_works + self.num_subfinished_works + self.num_failed_works
2243 num_total_works += self.num_expired_works + self.num_cancelled_works + self.num_suspended_works
2244 num_total_works += len(self.new_to_run_works) + len(self.current_running_works)
2245 if self.num_total_works > 0 and len(self.new_to_run_works) == 0 and len(self.current_running_works) == 0 and num_total_works == self.num_total_works:
2246 return True
2247 return False
2248
2249 def is_finished(self, synchronize=True):
2250 """
2251 *** Function called by Marshaller agent.
2252 """
2253 return self.is_terminated(synchronize=synchronize) and self.num_finished_works == self.num_total_works and (self.num_total_works > 0)
2254
2255 def is_subfinished(self, synchronize=True):
2256 """
2257 *** Function called by Marshaller agent.
2258 """
2259 return self.is_terminated(synchronize=synchronize) and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works)
2260
2261 def is_processed(self, synchronize=True):
2262 """
2263 *** Function called by Transformer agent.
2264 """
2265 return self.is_terminated(synchronize=synchronize) and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works)
2266
2267 def is_failed(self, synchronize=True):
2268 """
2269 *** Function called by Marshaller agent.
2270 """
2271 return self.is_terminated(synchronize=synchronize) and (self.num_failed_works > 0) and (self.num_cancelled_works == 0) and (self.num_suspended_works == 0) and (self.num_expired_works == 0)
2272
2273 def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
2274 if self.expired:
2275
2276 return False
2277 if expired_at:
2278 if type(expired_at) in [str]:
2279 expired_at = str_to_date(expired_at)
2280 if expired_at < datetime.datetime.utcnow():
2281 self.logger.info("Request(%s) expired_at(%s) is smaller than utc now(%s), expiring" % (request_id,
2282 expired_at,
2283 datetime.datetime.utcnow()))
2284 return True
2285
2286 act_pending_time = None
2287 if self.pending_time:
2288
2289 act_pending_time = float(self.pending_time)
2290 else:
2291 if pending_time:
2292 act_pending_time = float(pending_time)
2293 if act_pending_time:
2294 act_pending_seconds = int(86400 * act_pending_time)
2295 if self.last_updated_at + datetime.timedelta(seconds=act_pending_seconds) < datetime.datetime.utcnow():
2296 log_str = "Request(%s) last updated at(%s) + pending seconds(%s)" % (request_id,
2297 self.last_updated_at,
2298 act_pending_seconds)
2299 log_str += " is smaller than utc now(%s), expiring" % (datetime.datetime.utcnow())
2300 self.logger.info(log_str)
2301 return True
2302
2303 return False
2304
2305 def is_expired(self, synchronize=True):
2306 """
2307 *** Function called by Marshaller agent.
2308 """
2309
2310 return self.is_terminated(synchronize=synchronize) and self.expired
2311
2312 def is_cancelled(self, synchronize=True):
2313 """
2314 *** Function called by Marshaller agent.
2315 """
2316 return self.is_terminated(synchronize=synchronize) and (self.num_cancelled_works > 0)
2317
2318 def is_suspended(self, synchronize=True):
2319 """
2320 *** Function called by Marshaller agent.
2321 """
2322 return self.is_terminated(synchronize=synchronize) and (self.num_suspended_works > 0)
2323
2324 def get_terminated_msg(self):
2325 """
2326 *** Function called by Marshaller agent.
2327 """
2328 if self.last_work:
2329 return self.works[self.last_work].get_terminated_msg()
2330 return None
2331
2332 def get_status(self):
2333 if self.is_terminated():
2334 if self.is_finished(synchronize=False):
2335 return WorkStatus.Finished
2336 elif self.is_subfinished(synchronize=False):
2337 return WorkStatus.SubFinished
2338 elif self.is_failed(synchronize=False):
2339 return WorkStatus.Failed
2340 elif self.is_expired(synchronize=False):
2341 return WorkStatus.Expired
2342 elif self.is_cancelled(synchronize=False):
2343 return WorkStatus.Cancelled
2344 elif self.is_suspended(synchronize=False):
2345 return WorkStatus.Suspended
2346 return WorkStatus.Transforming
2347
2348 def depend_on(self, work):
2349 return False
2350
2351 def add_proxy(self):
2352 self.proxy = get_proxy()
2353 if not self.proxy:
2354 raise Exception("Cannot get local proxy")
2355
2356 def get_proxy(self):
2357 return self.proxy
2358
2359
2360 class Workflow(Base):
2361 def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
2362
2363 self.logger = logger
2364 if self.logger is None:
2365 self.setup_logger()
2366
2367 self.template = WorkflowBase(name=name, workload_id=workload_id, lifetime=lifetime, pending_time=pending_time, logger=logger)
2368
2369 self.build_work = None
2370
2371 self.parent_num_run = None
2372 self._num_run = 0
2373 self.runs = {}
2374 self.loop_condition_position = 'end'
2375
2376 def setup_logger(self):
2377
2378 self.logger = logging.getLogger(self.get_class_name())
2379
2380 def log_info(self, info):
2381 if self.logger is None:
2382 self.setup_logger()
2383 self.logger.info(info)
2384
2385 def log_debug(self, info):
2386 if self.logger is None:
2387 self.setup_logger()
2388 self.logger.debug(info)
2389
2390 def __deepcopy__(self, memo):
2391 logger = self.logger
2392 self.logger = None
2393
2394 cls = self.__class__
2395 result = cls.__new__(cls)
2396
2397 memo[id(self)] = result
2398
2399
2400 for k, v in self.__dict__.items():
2401 setattr(result, k, copy.deepcopy(v, memo))
2402
2403 self.logger = logger
2404 result.logger = logger
2405 return result
2406
2407 def get_template_id(self):
2408 return self.template.get_template_id()
2409
2410 def is_with_steps(self):
2411 return self.template.is_with_steps()
2412
2413 def set_additional_data_storage(self, storage):
2414 self.template.set_additional_data_storage(storage)
2415
2416 def get_additional_data_storage(self):
2417 return self.template.get_additional_data_storage()
2418
2419 def convert_data_to_additional_data_storage(self, storage, storage_name=None, replace_storage_name=False):
2420 return self.template.convert_data_to_additional_data_storage(storage, storage_name=storage_name, replace_storage_name=replace_storage_name)
2421
2422 @property
2423 def is_workflow_step(self):
2424 if self.runs:
2425 return self.runs[str(self.num_run)].is_workflow_step
2426 return self.template.is_workflow_step
2427
2428 @is_workflow_step.setter
2429 def is_workflow_step(self, value):
2430 if self.runs:
2431 self.runs[str(self.num_run)].is_workflow_step = value
2432 self.template.is_workflow_step = value
2433
2434 @property
2435 def step_name(self):
2436 if self.runs:
2437 return self.runs[str(self.num_run)].step_name
2438 return self.template.step_name
2439
2440 @step_name.setter
2441 def step_name(self, value):
2442 if self.runs:
2443 self.runs[str(self.num_run)].step_name = value
2444 self.template.step_name = value
2445
2446 @property
2447 def workflow_data(self):
2448 if self.runs:
2449 return self.runs[str(self.num_run)].workflow_data
2450 return self.template.workflow_data
2451
2452 @workflow_data.setter
2453 def workflow_data(self, value):
2454 if self.runs:
2455 self.runs[str(self.num_run)].workflow_data = value
2456 self.template.workflow_data = value
2457
2458 def split_workflow_to_steps(self, request_cache=None, max_request_length=None):
2459 return self.template.split_workflow_to_steps(request_cache=request_cache, max_request_length=max_request_length)
2460
2461 @property
2462 def metadata(self):
2463 run_metadata = {'build': self.get_build_metadata(),
2464 'parent_num_run': self.parent_num_run,
2465 'num_run': self._num_run,
2466 'runs': {}}
2467 for run_id in self.runs:
2468 run_metadata['runs'][run_id] = self.runs[run_id].metadata
2469 if not self.runs:
2470 run_metadata['parameter_links'] = self.template.get_parameter_links_metadata()
2471 return run_metadata
2472
2473 @metadata.setter
2474 def metadata(self, value):
2475 self.template.load_metadata()
2476 run_metadata = value
2477 build_metadata = run_metadata['build'] if 'build' in run_metadata else {}
2478 self.set_build_metadata(build_metadata)
2479 self.parent_num_run = run_metadata['parent_num_run']
2480 self._num_run = run_metadata['num_run']
2481 runs = run_metadata['runs']
2482 if not runs and 'parameter_links' in run_metadata:
2483 parameter_links = run_metadata['parameter_links']
2484 self.template.set_parameter_links_metadata(parameter_links)
2485 for run_id in runs:
2486 self.template.parent_num_run = self.parent_num_run
2487 self.runs[run_id] = self.template.copy()
2488 self.runs[run_id].metadata = runs[run_id]
2489 self.runs[run_id].parent_num_run = self.parent_num_run
2490
2491
2492 @property
2493 def independent_works(self):
2494 if self.runs:
2495 return self.runs[str(self.num_run)].independent_works
2496 return self.template.independent_works
2497
2498 @independent_works.setter
2499 def independent_works(self, value):
2500 if self.runs:
2501 self.runs[str(self.num_run)].independent_works = value
2502 self.template.independent_works = value
2503
2504 def add_next_work(self, work_id):
2505 if self.runs:
2506 self.runs[str(self.num_run)].add_next_work(work_id)
2507 else:
2508 raise Exception("There are no runs. It should not have next work")
2509
2510 @property
2511 def last_updated_at(self):
2512 if self.runs:
2513 return self.runs[str(self.num_run)].last_updated_at
2514 return None
2515
2516 @last_updated_at.setter
2517 def last_updated_at(self, value):
2518 if self.runs:
2519 self.runs[str(self.num_run)].last_updated_at = value
2520
2521 @property
2522 def name(self):
2523 return self.template.name
2524
2525 @name.setter
2526 def name(self, value):
2527 self.template.name = value
2528
2529 @property
2530 def campaign(self):
2531 return self.template.campaign
2532
2533 @campaign.setter
2534 def campaign(self, value):
2535 self.template.campaign = value
2536
2537 @property
2538 def campaign_scope(self):
2539 return self.template.campaign_scope
2540
2541 @campaign_scope.setter
2542 def campaign_scope(self, value):
2543 self.template.campaign_scope = value
2544
2545 @property
2546 def campaign_group(self):
2547 return self.template._campaign_group
2548
2549 @campaign_group.setter
2550 def campaign_group(self, value):
2551 self.template._campaign_group = value
2552
2553 @property
2554 def campaign_tag(self):
2555 return self.template._campaign_tag
2556
2557 @campaign_tag.setter
2558 def campaign_tag(self, value):
2559 self.template._campaign_tag = value
2560
2561 @property
2562 def max_processing_requests(self):
2563 return self.template._max_processing_requests
2564
2565 @max_processing_requests.setter
2566 def max_processing_requests(self, value):
2567 self.template._max_processing_requests = value
2568
2569 @property
2570 def username(self):
2571 return self.template.username
2572
2573 @username.setter
2574 def username(self, value):
2575 self.template.username = value
2576
2577 @property
2578 def userdn(self):
2579 return self.template.userdn
2580
2581 @userdn.setter
2582 def userdn(self, value):
2583 self.template.userdn = value
2584
2585 @property
2586 def lifetime(self):
2587 return self.template.lifetime
2588
2589 @lifetime.setter
2590 def lifetime(self, value):
2591 self.template.lifetime = value
2592
2593 @property
2594 def to_cancel(self):
2595 return self.template.to_cancel
2596
2597 @to_cancel.setter
2598 def to_cancel(self, value):
2599 if self.runs:
2600 self.runs[str(self.num_run)].to_cancel = value
2601 self.template.to_cancel = value
2602
2603 @property
2604 def num_run(self):
2605 return self._num_run
2606
2607 @num_run.setter
2608 def num_run(self, value):
2609 self._num_run = value
2610
2611 def get_combined_num_run(self):
2612 if self.parent_num_run:
2613 return str(self.parent_num_run) + "_" + str(self.num_run)
2614 return str(self.num_run)
2615
2616 @property
2617 def transforming(self):
2618 if self.runs and str(self.num_run) in self.runs:
2619 return True
2620 return False
2621
2622 @transforming.setter
2623 def transforming(self, value):
2624 if self._num_run < 1:
2625 self._num_run = 1
2626 if str(self.num_run) not in self.runs:
2627 self.template.parent_num_run = self.parent_num_run
2628 self.runs[str(self.num_run)] = self.template.copy()
2629 if self.runs[str(self.num_run)].has_loop_condition():
2630 self.runs[str(self.num_run)].num_run = self.num_run
2631 self.runs[str(self.num_run)].parent_num_run = self.parent_num_run
2632
2633
2634 if self.runs[str(self.num_run)].has_loop_condition():
2635 if self._num_run > 1:
2636 p_metadata = self.runs[str(self.num_run - 1)].get_metadata_item('parameter_links')
2637 self.runs[str(self.num_run)].add_metadata_item('parameter_links', p_metadata)
2638
2639 @property
2640 def submitted(self):
2641 return self.transforming
2642
2643 @submitted.setter
2644 def submitted(self, value):
2645 pass
2646
2647 def has_dependency(self):
2648 return False
2649
2650 def set_workload_id(self, workload_id):
2651 if self.runs:
2652 self.runs[str(self.num_run)].workload_id = workload_id
2653 else:
2654 self.template.workload_id = workload_id
2655
2656
2657 def set_internal_id(self, value):
2658 if self.runs:
2659 return self.runs[str(self.num_run)].set_internal_id(value)
2660 return self.template.set_internal_id(value)
2661
2662 def get_internal_id(self):
2663 if self.runs:
2664 return self.runs[str(self.num_run)].get_internal_id()
2665 return self.template.get_internal_id()
2666
2667 def get_workload_id(self):
2668 if self.runs:
2669 return self.runs[str(self.num_run)].workload_id
2670 return self.template.workload_id
2671
2672 def get_site(self):
2673 return self.template.get_site()
2674
2675 def get_cloud(self):
2676 return self.template.get_cloud()
2677
2678 def get_queue(self):
2679 return self.template.get_queue()
2680
2681 def add_work(self, work, initial=False, primary=False):
2682 self.template.add_work(work, initial, primary)
2683
2684 def add_build_work(self, work, initial=False, primary=False):
2685 self.template.add_build_work(work, initial, primary)
2686
2687 def get_build_metadata(self):
2688 if self.build_work is not None:
2689 return self.build_work.metadata
2690
2691 build_work = self.template.get_build_work()
2692 if build_work is not None:
2693 return build_work.metadata
2694 return None
2695
2696 def set_build_metadata(self, metadata):
2697 if self.build_work is not None:
2698 self.build_work.metadata = metadata
2699
2700 build_work = self.template.get_build_work()
2701 if build_work is not None:
2702 self.build_work = self.template.copy()
2703 self.build_work.metadata = metadata
2704
2705 def get_build_work(self):
2706 if self.build_work is not None:
2707 return self.build_work
2708 build_work = self.template.get_build_work()
2709 if build_work is not None:
2710 self.build_work = self.template.copy()
2711 self.build_work.sign()
2712 return self.build_work
2713
2714 def has_to_build_work(self):
2715 build_work = self.get_buil_work()
2716 if build_work is None:
2717 return False
2718
2719 if not (build_work.is_started() or build_work.is_starting()):
2720 return True
2721 return False
2722
2723 def add_condition(self, cond):
2724 self.template.add_condition(cond)
2725
2726 def add_parameter_link(self, work_source, work_destinations, parameter_link):
2727 self.template.add_parameter_link(work_source, work_destinations, parameter_link)
2728
2729 def find_workflow_from_work(self, work):
2730 return self.template.find_workflow_from_work(work)
2731
2732 def find_parameter_links_from_id(self, internal_id):
2733 if self.runs:
2734 return self.runs[str(self.num_run)].find_parameter_links_from_id(internal_id)
2735 return self.template.find_parameter_links_from_id(internal_id)
2736
2737 def refresh_parameter_links(self):
2738 if self.runs:
2739 self.runs[str(self.num_run)].refresh_parameter_links()
2740
2741 def set_global_parameters(self, value):
2742 self.template.set_global_parameters(value)
2743
2744 def set_sliced_global_parameters(self, source, name=None, index=0):
2745 self.template.set_sliced_global_parameters(source, name=name, index=index)
2746
2747 def sync_global_parameters_from_work(self, work):
2748 if self.runs:
2749 return self.runs[str(self.num_run)].sync_global_parameters_from_work(work)
2750 return self.template.sync_global_parameters_from_work(work)
2751
2752 def sync_global_parameters(self, global_parameters, sliced_global_parameters=None):
2753 if self.runs:
2754 return self.runs[str(self.num_run)].sync_global_parameters(global_parameters, sliced_global_parameters)
2755 return self.template.sync_global_parameters(global_parameters, sliced_global_parameters)
2756
2757 def get_new_works(self, synchronize=True):
2758 self.logger.info("%s get_new_works" % self.get_internal_id())
2759
2760 build_work = self.get_build_work()
2761 if build_work:
2762 build_work.num_run = 0
2763 if not (build_work.is_started() or build_work.is_starting()):
2764 return build_work
2765 elif not build_work.is_terminated():
2766 return []
2767 elif build_work.is_terminated() and not build_work.is_finished():
2768 return []
2769
2770 self.log_debug("synchronizing works")
2771 if synchronize:
2772 self.sync_works(to_cancel=self.to_cancel)
2773 self.log_debug("synchronized works")
2774 works = []
2775 self.log_debug("%s num_run: %s" % (self.get_internal_id(), self.num_run))
2776 self.log_debug("%s runs: %s" % (self.get_internal_id(), str(self.runs)))
2777 if self.runs:
2778
2779 works = self.runs[str(self.num_run)].get_new_works(synchronize=True)
2780 self.logger.info("%s new workers: %s" % (self.get_internal_id(), str(works)))
2781 self.runs[str(self.num_run)].transforming = True
2782 if works:
2783 for work in works:
2784 work.num_run = int(self.num_run) if self.num_run is not None else 0
2785 self.logger.info("%s get_new_works done" % self.get_internal_id())
2786 return works
2787
2788 def get_current_works(self):
2789 build_work = self.get_build_work()
2790 if build_work:
2791 build_work.num_run = 0
2792 if (build_work.is_started() or build_work.is_starting()):
2793 if (not build_work.is_terminated()):
2794 return [build_work]
2795 elif not build_work.is_finished():
2796 return []
2797 else:
2798 return []
2799
2800 self.sync_works(to_cancel=self.to_cancel)
2801 if self.runs:
2802 works = self.runs[str(self.num_run)].get_current_works()
2803 for work in works:
2804 work.num_run = int(self.num_run) if self.num_run is not None else 0
2805 return []
2806
2807 def get_all_works(self, synchronize=True):
2808 self.logger.info("%s get_all_works" % self.get_internal_id())
2809 works = []
2810
2811 build_work = self.get_build_work()
2812 if build_work:
2813 if build_work.is_finished():
2814 build_work.num_run = 0
2815 works = [build_work]
2816 else:
2817 return [build_work]
2818
2819 if synchronize:
2820 self.sync_works(to_cancel=self.to_cancel)
2821 if self.runs:
2822 run_works = self.runs[str(self.num_run)].get_all_works(synchronize=False)
2823 if run_works:
2824 for work in run_works:
2825 work.num_run = int(self.num_run) if self.num_run is not None else 0
2826
2827 works = works + run_works
2828
2829 self.logger.info("%s get_all_works done" % self.get_internal_id())
2830 return works
2831
2832 def get_primary_initial_collection(self):
2833 if self.runs:
2834 return self.runs[str(self.num_run)].get_primary_initial_collection()
2835 return self.template.get_primary_initial_collection()
2836
2837 def resume_works(self):
2838 if self.runs:
2839 self.runs[str(self.num_run)].resume_works()
2840 self.template.to_cancel = False
2841
2842 def clean_works(self):
2843
2844
2845 self.template.clean_works()
2846 self.parent_num_run = None
2847 self._num_run = 0
2848 self.runs = {}
2849 self.build_work = None
2850
2851 def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
2852 build_work = self.get_build_work()
2853 if build_work:
2854 if not build_work.is_terminated():
2855 return build_work.is_to_expire(expired_at=expired_at, pending_time=pending_time, request_id=request_id)
2856 elif build_work.is_terminated() and not build_work.is_finished():
2857 return False
2858 else:
2859 pass
2860
2861 if self.runs:
2862 return self.runs[str(self.num_run)].is_to_expire(expired_at=expired_at, pending_time=pending_time, request_id=request_id)
2863 return False
2864
2865 def is_terminated(self, synchronize=True):
2866 build_work = self.get_build_work()
2867 if build_work:
2868 if build_work.is_terminated():
2869 if not build_work.is_finished():
2870 return True
2871 else:
2872 pass
2873 else:
2874 return False
2875
2876 if self.runs:
2877 if self.runs[str(self.num_run)].is_terminated(synchronize=synchronize):
2878 if not self.runs[str(self.num_run)].has_loop_condition() or not self.runs[str(self.num_run)].get_loop_condition_status():
2879 return True
2880 return False
2881
2882 def is_finished(self, synchronize=True):
2883 if self.is_terminated(synchronize=synchronize):
2884 build_work = self.get_build_work()
2885 if build_work:
2886 if build_work.is_terminated():
2887 if not build_work.is_finished():
2888 return False
2889 else:
2890 pass
2891 else:
2892 return False
2893 return self.runs[str(self.num_run)].is_finished(synchronize=False)
2894 return False
2895
2896 def is_subfinished(self, synchronize=True):
2897 if self.is_terminated(synchronize=synchronize):
2898 build_work = self.get_build_work()
2899 if build_work:
2900 if build_work.is_terminated():
2901 if not build_work.is_finished():
2902 return False
2903 else:
2904 pass
2905 else:
2906 return False
2907 return self.runs[str(self.num_run)].is_subfinished(synchronize=False)
2908 return False
2909
2910 def is_processed(self, synchronize=True):
2911 if self.is_terminated(synchronize=synchronize):
2912 build_work = self.get_build_work()
2913 if build_work:
2914 if build_work.is_terminated():
2915 if not build_work.is_processed():
2916 return False
2917 else:
2918 pass
2919 else:
2920 return False
2921 return self.runs[str(self.num_run)].is_processed(synchronize=False)
2922 return False
2923
2924 def is_failed(self, synchronize=True):
2925 if self.is_terminated(synchronize=synchronize):
2926 build_work = self.get_build_work()
2927 if build_work:
2928 if build_work.is_terminated():
2929 if not build_work.is_finished():
2930 return True
2931 else:
2932 pass
2933 else:
2934 return False
2935 return self.runs[str(self.num_run)].is_failed(synchronize=False)
2936 return False
2937
2938 def is_expired(self, synchronize=True):
2939 if self.is_terminated(synchronize=synchronize):
2940 build_work = self.get_build_work()
2941 if build_work:
2942 if build_work.is_terminated():
2943 if not build_work.is_finished():
2944 return False
2945 else:
2946 pass
2947 else:
2948 if build_work.is_expired():
2949 return True
2950 else:
2951 return False
2952 return self.runs[str(self.num_run)].is_expired(synchronize=False)
2953 return False
2954
2955 def is_cancelled(self, synchronize=True):
2956 if self.is_terminated(synchronize=synchronize):
2957 build_work = self.get_build_work()
2958 if build_work:
2959 if build_work.is_terminated():
2960 if not build_work.is_finished():
2961 return False
2962 else:
2963 pass
2964 else:
2965 if build_work.is_cancelled():
2966 return True
2967 else:
2968 return False
2969 return self.runs[str(self.num_run)].is_cancelled(synchronize=False)
2970 return False
2971
2972 def is_suspended(self, synchronize=True):
2973 if self.is_terminated(synchronize=synchronize):
2974 build_work = self.get_build_work()
2975 if build_work:
2976 if build_work.is_terminated():
2977 if not build_work.is_finished():
2978 return False
2979 else:
2980 pass
2981 else:
2982 if build_work.is_suspended():
2983 return True
2984 else:
2985 return False
2986 return self.runs[str(self.num_run)].is_suspended(synchronize=False)
2987 return False
2988
2989 def get_terminated_msg(self):
2990 if self.is_terminated():
2991 return self.runs[str(self.num_run)].get_terminated_msg()
2992 return None
2993
2994 def get_status(self):
2995 if not self.runs:
2996 return WorkStatus.New
2997 if not self.is_terminated():
2998 return WorkStatus.Transforming
2999 return self.runs[str(self.num_run)].get_status()
3000
3001 def depend_on(self, work):
3002 return self.template.depend_on(work)
3003
3004 def add_proxy(self):
3005 self.template.add_proxy()
3006
3007 def get_proxy(self):
3008 self.template.get_proxy()
3009
3010 def add_loop_condition(self, condition, position='end'):
3011 if not position or position != 'begin':
3012 position = 'end'
3013 position = 'end'
3014 self.template.add_loop_condition(condition, position=position)
3015 self.loop_condition_position = position
3016
3017 def refresh(self):
3018 self.refresh_works()
3019
3020 def refresh_works(self, clean=False):
3021 if self.runs:
3022 self.runs[str(self.num_run)].refresh_works(clean=clean)
3023
3024 def sync_works(self, to_cancel=False):
3025 if to_cancel:
3026 self.to_cancel = to_cancel
3027
3028 build_work = self.get_build_work()
3029 if build_work:
3030 if not build_work.is_terminated():
3031 return
3032 elif build_work.is_terminated() and not build_work.is_finished():
3033 return
3034
3035 self.refresh_works()
3036
3037 if self.num_run < 1:
3038 self.num_run = 1
3039 if str(self.num_run) not in self.runs:
3040 self.template.parent_num_run = self.parent_num_run
3041 self.runs[str(self.num_run)] = self.template.copy()
3042 if self.runs[str(self.num_run)].has_loop_condition():
3043 self.runs[str(self.num_run)].num_run = self.num_run
3044
3045
3046 if self.runs[str(self.num_run)].has_loop_condition():
3047 if self.num_run > 1:
3048 p_metadata = self.runs[str(self.num_run - 1)].get_metadata_item('parameter_links')
3049 self.runs[str(self.num_run)].add_metadata_item('parameter_links', p_metadata)
3050
3051 self.runs[str(self.num_run)].sync_works(to_cancel=to_cancel)
3052
3053 if self.runs[str(self.num_run)].is_terminated(synchronize=False, new=True):
3054 self.logger.info("%s num_run %s is_terminated" % (self.get_internal_id(), self.num_run))
3055 if to_cancel:
3056 self.logger.info("num_run %s, to cancel" % self.num_run)
3057 else:
3058 if self.runs[str(self.num_run)].has_loop_condition():
3059 if self.runs[str(self.num_run)].get_loop_condition_status():
3060 self.logger.info("%s num_run %s get_loop_condition_status %s, start next run" % (self.get_internal_id(), self.num_run, self.runs[str(self.num_run)].get_loop_condition_status()))
3061 self._num_run += 1
3062 self.logger.info("new num_run is %s" % (self.num_run))
3063 self.template.parent_num_run = self.parent_num_run
3064 self.runs[str(self.num_run)] = self.template.copy()
3065
3066 if self.runs[str(self.num_run)].has_loop_condition():
3067 self.runs[str(self.num_run)].num_run = self.num_run
3068
3069 p_metadata = self.runs[str(self.num_run - 1)].get_metadata_item('parameter_links')
3070 self.runs[str(self.num_run)].add_metadata_item('parameter_links', p_metadata)
3071
3072 self.runs[str(self.num_run)].global_parameters = self.runs[str(self.num_run - 1)].global_parameters
3073 self.logger.info("%s new num_run is %s" % (self.get_internal_id(), self.num_run))
3074 else:
3075 self.logger.info("%s num_run %s get_loop_condition_status %s, terminated loop" % (self.get_internal_id(), self.num_run, self.runs[str(self.num_run)].get_loop_condition_status()))
3076 self.refresh_works()
3077
3078 def get_relation_map(self):
3079 if not self.runs:
3080 return []
3081 if self.template.has_loop_condition():
3082 rets = {}
3083 for run in self.runs:
3084 rets[run] = self.runs[run].get_relation_map()
3085 return [rets]
3086 else:
3087 return self.runs[str(self.num_run)].get_relation_map()
3088
3089
3090 class SubWorkflow(Workflow):
3091 def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
3092
3093 super(SubWorkflow, self).__init__(name=name, workload_id=workload_id, lifetime=lifetime, pending_time=pending_time, logger=logger)
3094
3095
3096 class LoopWorkflow(Workflow):
3097 def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
3098
3099 super(LoopWorkflow, self).__init__(name=name, workload_id=workload_id, lifetime=lifetime, pending_time=pending_time, logger=logger)