Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:33

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020 - 2025
0010 
0011 import copy
0012 import datetime
0013 import json
0014 import logging
0015 import inspect
0016 import os
0017 import random
0018 import uuid
0019 
0020 
0021 from idds.common import exceptions
0022 from idds.common.constants import IDDSEnum, WorkStatus
0023 from idds.common.utils import json_dumps, setup_logging, get_proxy
0024 from idds.common.utils import str_to_date
0025 from .base import Base
0026 from .work import Work, Collection
0027 
0028 
0029 setup_logging(__name__)
0030 
0031 
0032 class ConditionOperator(IDDSEnum):
0033     And = 0
0034     Or = 1
0035 
0036 
0037 class ConditionTrigger(IDDSEnum):
0038     NotTriggered = 0
0039     ToTrigger = 1
0040     Triggered = 2
0041 
0042 
0043 class CompositeCondition(Base):
0044     def __init__(self, operator=ConditionOperator.And, conditions=[], true_works=None, false_works=None, logger=None):
0045         self._conditions = []
0046         self._true_works = []
0047         self._false_works = []
0048 
0049         super(CompositeCondition, self).__init__()
0050 
0051         self.internal_id = str(uuid.uuid4())[:8]
0052         self.template_id = self.internal_id
0053         # self.template_id = str(uuid.uuid4())[:8]
0054 
0055         self.logger = logger
0056         if self.logger is None:
0057             self.setup_logger()
0058 
0059         if conditions is None:
0060             conditions = []
0061         if true_works is None:
0062             true_works = []
0063         if false_works is None:
0064             false_works = []
0065         if conditions and type(conditions) not in [tuple, list]:
0066             conditions = [conditions]
0067         if true_works and type(true_works) not in [tuple, list]:
0068             true_works = [true_works]
0069         if false_works and type(false_works) not in [tuple, list]:
0070             false_works = [false_works]
0071         self.validate_conditions(conditions)
0072 
0073         self.operator = operator
0074         self.conditions = []
0075         self.true_works = []
0076         self.false_works = []
0077 
0078         self.conditions = conditions
0079         self.true_works = true_works
0080         self.false_works = false_works
0081 
0082     def get_class_name(self):
0083         return self.__class__.__name__
0084 
0085     def get_internal_id(self):
0086         return self.internal_id
0087 
0088     def get_template_id(self):
0089         return self.template_id
0090 
0091     def copy(self):
0092         new_cond = copy.deepcopy(self)
0093         return new_cond
0094 
0095     def __deepcopy__(self, memo):
0096         logger = self.logger
0097         self.logger = None
0098 
0099         cls = self.__class__
0100         result = cls.__new__(cls)
0101 
0102         memo[id(self)] = result
0103 
0104         # Deep copy all other attributes
0105         for k, v in self.__dict__.items():
0106             setattr(result, k, copy.deepcopy(v, memo))
0107 
0108         self.logger = logger
0109         result.logger = logger
0110         return result
0111 
0112     def setup_logger(self):
0113         # Setup logger
0114         self.logger = logging.getLogger(self.get_class_name())
0115 
0116     def log_info(self, info):
0117         if self.logger is None:
0118             self.setup_logger()
0119         self.logger.info(info)
0120 
0121     def log_debug(self, info):
0122         if self.logger is None:
0123             self.setup_logger()
0124         self.logger.debug(info)
0125 
0126     def log_error(self, info):
0127         if self.logger is None:
0128             self.setup_logger()
0129         self.logger.error(info)
0130 
0131     @property
0132     def conditions(self):
0133         # return self.get_metadata_item('true_works', [])
0134         return self._conditions
0135 
0136     @conditions.setter
0137     def conditions(self, value):
0138         self._conditions = value
0139 
0140     @property
0141     def true_works(self):
0142         # return self.get_metadata_item('true_works', [])
0143         return self._true_works
0144 
0145     @true_works.setter
0146     def true_works(self, value):
0147         self._true_works = value
0148         true_work_meta = self.get_metadata_item('true_works', {})
0149         for work in value:
0150             if work is None:
0151                 continue
0152             if isinstance(work, Work):
0153                 if work.get_internal_id() not in true_work_meta:
0154                     true_work_meta[work.get_internal_id()] = {'triggered': False}
0155             elif isinstance(work, CompositeCondition):
0156                 if work.get_internal_id() not in true_work_meta:
0157                     true_work_meta[work.get_internal_id()] = {'triggered': False}
0158             elif isinstance(work, Workflow):
0159                 if work.get_internal_id() not in true_work_meta:
0160                     true_work_meta[work.get_internal_id()] = {'triggered': False}
0161         self.add_metadata_item('true_works', true_work_meta)
0162 
0163     @property
0164     def false_works(self):
0165         # return self.get_metadata_item('false_works', [])
0166         return self._false_works
0167 
0168     @false_works.setter
0169     def false_works(self, value):
0170         self._false_works = value
0171         false_work_meta = self.get_metadata_item('false_works', {})
0172         for work in value:
0173             if work is None:
0174                 continue
0175             if isinstance(work, Work):
0176                 if work.get_internal_id() not in false_work_meta:
0177                     false_work_meta[work.get_internal_id()] = {'triggered': False}
0178             elif isinstance(work, CompositeCondition):
0179                 if work.get_internal_id() not in false_work_meta:
0180                     false_work_meta[work.get_internal_id()] = {'triggered': False}
0181             elif isinstance(work, Workflow):
0182                 if work.get_internal_id() not in false_work_meta:
0183                     false_work_meta[work.get_internal_id()] = {'triggered': False}
0184         self.add_metadata_item('false_works', false_work_meta)
0185 
0186     def validate_conditions(self, conditions):
0187         if type(conditions) not in [tuple, list]:
0188             raise exceptions.IDDSException("conditions must be list")
0189         for cond in conditions:
0190             assert (inspect.ismethod(cond))
0191 
0192     def add_condition(self, cond):
0193         assert (inspect.ismethod(cond))
0194         assert (isinstance(cond.__self__, Work))
0195         # self.conditions.append({'condition': cond, 'current_work': cond.__self__})
0196 
0197         self._conditions.append(cond)
0198 
0199     def load_metadata(self):
0200         # conditions = self.get_metadata_item('conditions', [])
0201         # true_works_meta = self.get_metadata_item('true_works', {})
0202         # false_works_meta = self.get_metadata_item('false_works', {})
0203         pass
0204 
0205     def to_dict(self):
0206         # print('to_dict')
0207         ret = {'class': self.__class__.__name__,
0208                'module': self.__class__.__module__,
0209                'attributes': {}}
0210         for key, value in self.__dict__.items():
0211             # print(key)
0212             # print(value)
0213             # if not key.startswith('__') and not key.startswith('_'):
0214             if not key.startswith('__'):
0215                 if key == 'logger':
0216                     value = None
0217                 elif key == '_conditions':
0218                     new_value = []
0219                     for cond in value:
0220                         if inspect.ismethod(cond):
0221                             if isinstance(cond.__self__, Work):
0222                                 new_cond = {'idds_method': cond.__name__,
0223                                             'idds_method_internal_id': cond.__self__.get_internal_id()}
0224                             elif isinstance(cond.__self__, CompositeCondition):
0225                                 new_cond = {'idds_method': cond.__name__,
0226                                             'idds_method_condition': cond.__self__.to_dict()}
0227                             elif isinstance(cond.__self__, Workflow):
0228                                 new_cond = {'idds_method': cond.__name__,
0229                                             'idds_method_internal_id': cond.__self__.get_internal_id()}
0230                             else:
0231                                 new_cond = {'idds_method': cond.__name__,
0232                                             'idds_method_internal_id': cond.__self__.get_internal_id()}
0233                         else:
0234                             if hasattr(cond, '__self__'):
0235                                 new_cond = {'idds_attribute': cond.__name__,
0236                                             'idds_method_internal_id': cond.__self__.get_internal_id()}
0237                             else:
0238                                 new_cond = cond
0239                         new_value.append(new_cond)
0240                     value = new_value
0241                 elif key in ['_true_works', '_false_works']:
0242                     new_value = []
0243                     for w in value:
0244                         if isinstance(w, Work):
0245                             new_w = w.get_internal_id()
0246                         elif isinstance(w, CompositeCondition):
0247                             new_w = w.to_dict()
0248                         elif isinstance(w, Workflow):
0249                             # new_w = w.to_dict()
0250                             new_w = w.get_internal_id()
0251                         else:
0252                             new_w = w
0253                         new_value.append(new_w)
0254                     value = new_value
0255                 else:
0256                     value = self.to_dict_l(value)
0257                 ret['attributes'][key] = value
0258         return ret
0259 
0260     def get_work_from_id(self, work_id, works):
0261         return works[work_id]
0262 
0263     def clean(self):
0264         for work in self.true_works:
0265             if isinstance(work, CompositeCondition):
0266                 work.clean()
0267             else:
0268                 true_work_meta = self.get_metadata_item('true_works', {})
0269                 if work.get_internal_id() in true_work_meta:
0270                     if true_work_meta[work.get_internal_id()]['triggered'] and not work.submitted:
0271                         true_work_meta[work.get_internal_id()]['triggered'] = False
0272 
0273     def load_conditions(self, works):
0274         # print("load_conditions")
0275         self.log_debug("load_conditions conditions: %s" % self.conditions)
0276         self.log_debug("load_conditions works: %s" % str(works))
0277         new_conditions = []
0278         for cond in self.conditions:
0279             if callable(cond):
0280                 new_conditions.append(cond)
0281             else:
0282                 if 'idds_method' in cond and 'idds_method_internal_id' in cond:
0283                     self.log_debug("idds_method_internal_id: %s" % cond['idds_method_internal_id'])
0284                     self.log_debug("idds_method: %s" % cond['idds_method'])
0285 
0286                     internal_id = cond['idds_method_internal_id']
0287                     work = self.get_work_from_id(internal_id, works)
0288 
0289                     self.log_debug("get_work_from_id: %s: [%s]" % (internal_id, [work]))
0290 
0291                     if work is not None:
0292                         new_cond = getattr(work, cond['idds_method'])
0293                     else:
0294                         self.log_error("Condition method work cannot be found for %s" % (internal_id))
0295                         new_cond = cond
0296                 elif 'idds_attribute' in cond and 'idds_method_internal_id' in cond:
0297                     internal_id = cond['idds_method_internal_id']
0298                     work = self.get_work_from_id(internal_id, works)
0299                     if work is not None:
0300                         new_cond = getattr(work, cond['idds_attribute'])
0301                     else:
0302                         self.log_error("Condition attribute work cannot be found for %s" % (internal_id))
0303                         new_cond = cond
0304                 elif 'idds_method' in cond and 'idds_method_condition' in cond:
0305                     new_cond = cond['idds_method_condition']
0306                     new_cond = getattr(new_cond, cond['idds_method'])
0307                 else:
0308                     new_cond = cond
0309                 new_conditions.append(new_cond)
0310         self.conditions = new_conditions
0311 
0312         new_true_works = []
0313         self.log_debug("true_works: %s" % str(self.true_works))
0314 
0315         for w in self.true_works:
0316             # self.log_debug("true_work: %s" % str(w))
0317             if isinstance(w, CompositeCondition):
0318                 # work = w.load_conditions(works, works_template)
0319                 w.load_conditions(works)
0320                 work = w
0321             elif isinstance(w, Workflow):
0322                 work = w
0323             elif isinstance(w, Work):
0324                 work = w
0325             elif type(w) in [str]:
0326                 work = self.get_work_from_id(w, works)
0327                 if work is None:
0328                     self.log_error("True work cannot be found for %s" % str(w))
0329                     work = w
0330             else:
0331                 self.log_error("True work cannot be found for type(%s): %s" % (type(w), str(w)))
0332                 work = w
0333             new_true_works.append(work)
0334         self.true_works = new_true_works
0335 
0336         new_false_works = []
0337         for w in self.false_works:
0338             if isinstance(w, CompositeCondition):
0339                 # work = w.load_condtions(works, works_template)
0340                 w.load_conditions(works)
0341                 work = w
0342             elif isinstance(w, Workflow):
0343                 work = w
0344             elif isinstance(w, Work):
0345                 work = w
0346             elif type(w) in [str]:
0347                 work = self.get_work_from_id(w, works)
0348                 if work is None:
0349                     self.log_error("False work cannot be found for type(%s): %s" % (type(w), str(w)))
0350                     work = w
0351             else:
0352                 self.log_error("False work cannot be found for %s" % str(w))
0353                 work = w
0354             new_false_works.append(work)
0355         self.false_works = new_false_works
0356 
0357     def all_works(self):
0358         works = []
0359         works = works + self.all_pre_works()
0360         works = works + self.all_next_works()
0361         return works
0362 
0363     def all_condition_ids(self):
0364         works = []
0365         for cond in self.conditions:
0366             if inspect.ismethod(cond):
0367                 if isinstance(cond.__self__, Work) or isinstance(cond.__self__, Workflow):
0368                     works.append(cond.__self__.get_internal_id())
0369                 elif isinstance(cond.__self__, CompositeCondition):
0370                     works = works + cond.__self__.all_condition_ids()
0371             else:
0372                 self.log_error("cond cannot be recognized: %s" % str(cond))
0373                 works.append(cond)
0374         for work in self.true_works + self.false_works:
0375             if isinstance(work, CompositeCondition):
0376                 works = works + work.all_condition_ids()
0377         return works
0378 
0379     def all_pre_works(self):
0380         works = []
0381         for cond in self.conditions:
0382             if inspect.ismethod(cond):
0383                 if isinstance(cond.__self__, Work) or isinstance(cond.__self__, Workflow):
0384                     works.append(cond.__self__)
0385                 elif isinstance(cond.__self__, CompositeCondition):
0386                     works = works + cond.__self__.all_pre_works()
0387             else:
0388                 self.log_error("cond cannot be recognized: %s" % str(cond))
0389                 works.append(cond)
0390         for work in self.true_works + self.false_works:
0391             if isinstance(work, CompositeCondition):
0392                 works = works + work.all_pre_works()
0393         return works
0394 
0395     def all_next_works(self):
0396         works = []
0397         for work in self.true_works + self.false_works:
0398             if isinstance(work, CompositeCondition):
0399                 works = works + work.all_next_works()
0400             else:
0401                 works.append(work)
0402         return works
0403 
0404     def get_current_cond_status(self, cond):
0405         if callable(cond):
0406             if cond():
0407                 return True
0408             else:
0409                 return False
0410         else:
0411             if cond:
0412                 return True
0413             else:
0414                 return False
0415 
0416     def get_cond_status(self):
0417         if self.operator == ConditionOperator.And:
0418             for cond in self.conditions:
0419                 cond_status = self.get_current_cond_status(cond)
0420                 # self.log_debug(f"And cond {cond} status: {cond_status}")
0421                 if not cond_status:
0422                     return False
0423             return True
0424         else:
0425             for cond in self.conditions:
0426                 if self.get_current_cond_status(cond):
0427                     return True
0428             return False
0429 
0430     def get_condition_status(self):
0431         return self.get_cond_status()
0432 
0433     def is_condition_true(self):
0434         if self.get_cond_status():
0435             return True
0436         return False
0437 
0438     def is_condition_false(self):
0439         if not self.get_cond_status():
0440             return True
0441         return False
0442 
0443     def get_next_works(self, trigger=ConditionTrigger.NotTriggered):
0444         works = []
0445         self.log_debug(f"get_next_works get_cond_status: {self.get_cond_status()} trigger: {trigger}")
0446         if self.get_cond_status():
0447             true_work_meta = self.get_metadata_item('true_works', {})
0448             self.log_debug(f"get_next_works true_work_meta: {true_work_meta}")
0449             for work in self.true_works:
0450                 if isinstance(work, CompositeCondition):
0451                     works = works + work.get_next_works(trigger=trigger)
0452                 else:
0453                     # self.log_debug("true work: %s" % str(work))
0454                     if work.get_internal_id() not in true_work_meta:
0455                         true_work_meta[work.get_internal_id()] = {'triggered': False}
0456                     if trigger == ConditionTrigger.ToTrigger:
0457                         if not true_work_meta[work.get_internal_id()]['triggered']:
0458                             true_work_meta[work.get_internal_id()]['triggered'] = True
0459                             works.append(work)
0460                     elif trigger == ConditionTrigger.NotTriggered:
0461                         if not true_work_meta[work.get_internal_id()]['triggered']:
0462                             works.append(work)
0463                     elif trigger == ConditionTrigger.Triggered:
0464                         if true_work_meta[work.get_internal_id()]['triggered']:
0465                             works.append(work)
0466             self.add_metadata_item('true_works', true_work_meta)
0467         else:
0468             false_work_meta = self.get_metadata_item('false_works', {})
0469             self.log_debug(f"get_next_works false_work_meta: {false_work_meta}")
0470             for work in self.false_works:
0471                 if isinstance(work, CompositeCondition):
0472                     works = works + work.get_next_works(trigger=trigger)
0473                 else:
0474                     if work.get_internal_id() not in false_work_meta:
0475                         false_work_meta[work.get_internal_id()] = {'triggered': False}
0476                     if trigger == ConditionTrigger.ToTrigger:
0477                         if not false_work_meta[work.get_internal_id()]['triggered']:
0478                             false_work_meta[work.get_internal_id()]['triggered'] = True
0479                             works.append(work)
0480                     elif trigger == ConditionTrigger.NotTriggered:
0481                         if not false_work_meta[work.get_internal_id()]['triggered']:
0482                             works.append(work)
0483                     elif trigger == ConditionTrigger.Triggered:
0484                         if false_work_meta[work.get_internal_id()]['triggered']:
0485                             works.append(work)
0486             self.add_metadata_item('false_works', false_work_meta)
0487         return works
0488 
0489 
0490 class AndCondition(CompositeCondition):
0491     def __init__(self, conditions=[], true_works=None, false_works=None, logger=None):
0492         super(AndCondition, self).__init__(operator=ConditionOperator.And,
0493                                            conditions=conditions,
0494                                            true_works=true_works,
0495                                            false_works=false_works,
0496                                            logger=logger)
0497 
0498 
0499 class OrCondition(CompositeCondition):
0500     def __init__(self, conditions=[], true_works=None, false_works=None, logger=None):
0501         super(OrCondition, self).__init__(operator=ConditionOperator.Or,
0502                                           conditions=conditions,
0503                                           true_works=true_works,
0504                                           false_works=false_works,
0505                                           logger=logger)
0506 
0507 
0508 class Condition(CompositeCondition):
0509     def __init__(self, cond=None, current_work=None, true_work=None, false_work=None, logger=None):
0510         super(Condition, self).__init__(operator=ConditionOperator.And,
0511                                         conditions=[cond] if cond else [],
0512                                         true_works=[true_work] if true_work else [],
0513                                         false_works=[false_work] if false_work else [],
0514                                         logger=logger)
0515 
0516     # to support load from old conditions
0517     @property
0518     def cond(self):
0519         # return self.get_metadata_item('true_works', [])
0520         return self.conditions[0] if len(self.conditions) >= 1 else None
0521 
0522     @cond.setter
0523     def cond(self, value):
0524         self.conditions = [value]
0525 
0526     @property
0527     def true_work(self):
0528         # return self.get_metadata_item('true_works', [])
0529         return self.true_works if len(self.true_works) >= 1 else None
0530 
0531     @true_work.setter
0532     def true_work(self, value):
0533         self.true_works = [value]
0534 
0535     @property
0536     def false_work(self):
0537         # return self.get_metadata_item('true_works', [])
0538         return self.false_works if len(self.false_works) >= 1 else None
0539 
0540     @false_work.setter
0541     def false_work(self, value):
0542         self.false_works = [value]
0543 
0544 
0545 class TemplateCondition(CompositeCondition):
0546     def __init__(self, cond=None, current_work=None, true_work=None, false_work=None, logger=None):
0547         if true_work is not None and not isinstance(true_work, Work):
0548             raise exceptions.IDDSException("true_work can only be set with Work class")
0549         if false_work is not None and not isinstance(false_work, Work):
0550             raise exceptions.IDDSException("false_work can only be set with Work class")
0551 
0552         super(TemplateCondition, self).__init__(operator=ConditionOperator.And,
0553                                                 conditions=[cond] if cond else [],
0554                                                 true_works=[true_work] if true_work else [],
0555                                                 false_works=[false_work] if false_work else [],
0556                                                 logger=logger)
0557 
0558     def validate_conditions(self, conditions):
0559         if type(conditions) not in [tuple, list]:
0560             raise exceptions.IDDSException("conditions must be list")
0561         if len(conditions) > 1:
0562             raise exceptions.IDDSException("Condition class can only support one condition. To support multiple condition, please use CompositeCondition.")
0563         for cond in conditions:
0564             assert (inspect.ismethod(cond))
0565             assert (isinstance(cond.__self__, Work))
0566 
0567     def add_condition(self, cond):
0568         raise exceptions.IDDSException("Condition class doesn't support add_condition. To support multiple condition, please use CompositeCondition.")
0569 
0570 
0571 class ParameterLink(Base):
0572     def __init__(self, parameters):
0573         super(ParameterLink, self).__init__()
0574         self.parameters = {}
0575         self.num_parameters = 0
0576         if parameters:
0577             if type(parameters) not in [list, tuple]:
0578                 parameters = [parameters]
0579             for p in parameters:
0580                 if p:
0581                     if type(p) in [str]:
0582                         self.parameters[str(self.num_parameters)] = {'source': p, 'destination': p}
0583                         self.num_parameters += 1
0584                     elif type(p) in [dict] and 'source' in p and 'destination' in p:
0585                         self.parameters[str(self.num_parameters)] = {'source': p['source'], 'destination': p['destination']}
0586                         self.num_parameters += 1
0587                     else:
0588                         raise Exception("Cannot parse the parameters format. Accepted format: list of string or dict{'source': <>, 'destination': <>}")
0589 
0590         self.internal_id = str(uuid.uuid4())[:8]
0591         self.template_id = self.internal_id
0592 
0593     def get_internal_id(self):
0594         return self.internal_id
0595 
0596     def get_parameter_value(self, work, p):
0597         ret = None
0598         p_f = getattr(work, p, 'None')
0599         if p_f:
0600             if callable(p_f):
0601                 ret = p_f()
0602             else:
0603                 ret = p_f
0604         else:
0605             ret = None
0606         if ret and type(ret) in [Collection] and hasattr(ret, 'to_origin_dict'):
0607             ret = ret.to_origin_dict()
0608         return ret
0609 
0610     def set_parameters(self, work):
0611         p_values = {}
0612         for p in self.parameters:
0613             p_values[p] = self.get_parameter_value(work, self.parameters[p]['source'])
0614         self.add_metadata_item('parameters', p_values)
0615 
0616     def get_parameters(self):
0617         p_values = self.get_metadata_item('parameters', {})
0618         ret = {}
0619         for p in self.parameters:
0620             if p in p_values:
0621                 ret[self.parameters[p]['destination']] = p_values[p]
0622         return ret
0623 
0624 
0625 class WorkflowBase(Base):
0626 
0627     def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
0628         """
0629         Init a workflow.
0630         """
0631         self._works = {}
0632         self._conditions = {}
0633         self._work_conds = {}
0634 
0635         self.parameter_links = {}
0636         self.parameter_links_source = {}
0637         self.parameter_links_destination = {}
0638 
0639         self._global_parameters = {}
0640 
0641         super(WorkflowBase, self).__init__()
0642 
0643         self.internal_id = str(uuid.uuid4())[:8]
0644         self.template_work_id = self.internal_id
0645         # self.template_work_id = str(uuid.uuid4())[:8]
0646         self.lifetime = lifetime
0647         self.pending_time = pending_time
0648 
0649         if name:
0650             # self._name = name + "." + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000))
0651             self._name = name
0652         else:
0653             self._name = 'idds.workflow.' + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S_%f") + str(random.randint(1, 1000))
0654 
0655         if workload_id is None:
0656             # workload_id = int(time.time())
0657             pass
0658         self.workload_id = workload_id
0659 
0660         self.logger = logger
0661         if self.logger is None:
0662             self.setup_logger()
0663 
0664         self.build_work = None
0665         self._works = {}
0666         self.works = {}
0667         self.work_sequence = {}  # order list
0668 
0669         self.next_works = []
0670 
0671         self.terminated_works = []
0672         self.initial_works = []
0673         # if the primary initial_work is not set, it's the first initial work.
0674         self.primary_initial_work = None
0675         self.independent_works = []
0676 
0677         self.first_initial = False
0678         self.new_to_run_works = []
0679         self.submitting_works = []
0680         self.current_running_works = []
0681 
0682         self.num_subfinished_works = 0
0683         self.num_finished_works = 0
0684         self.num_failed_works = 0
0685         self.num_cancelled_works = 0
0686         self.num_suspended_works = 0
0687         self.num_expired_works = 0
0688         self.num_total_works = 0
0689 
0690         self.last_work = None
0691 
0692         self.last_updated_at = datetime.datetime.utcnow()
0693         self.expired = False
0694 
0695         self.to_update_transforms = {}
0696 
0697         # user defined Condition class
0698         self.user_defined_conditions = {}
0699 
0700         self.username = None
0701         self.userdn = None
0702         self.proxy = None
0703 
0704         self._loop_condition_position = 'end'
0705         self.loop_condition = None
0706 
0707         self.num_run = None
0708 
0709         self.global_parameters = {}
0710 
0711         self.to_cancel = False
0712 
0713         self.synchronized = False
0714 
0715         self.additional_data_storage = None
0716 
0717         self.campaign = None
0718         self.campaign_scope = None
0719         self.campaign_group = None
0720         self.campaign_tag = None
0721         self.max_processing_requests = -1
0722 
0723         self._request_cache = None
0724 
0725         self.with_steps = False
0726 
0727         self.is_workflow_step = False
0728         self.step_name = None
0729         self.workflow_data = None
0730 
0731         """
0732         self._running_data_names = []
0733         for name in ['internal_id', 'template_work_id', 'workload_id', 'work_sequence', 'terminated_works',
0734                      'first_initial', 'new_to_run_works', 'current_running_works',
0735                      'num_subfinished_works', 'num_finished_works', 'num_failed_works', 'num_cancelled_works', 'num_suspended_works',
0736                      'num_expired_works', 'num_total_works', 'last_work']:
0737             self._running_data_names.append(name)
0738         for name in ['works']:
0739             self._running_data_names.append(name)
0740         """
0741 
0742     @property
0743     def name(self):
0744         return self._name
0745 
0746     @name.setter
0747     def name(self, value):
0748         self._name = value
0749 
0750     @property
0751     def is_workflow_step(self):
0752         return self._is_workflow_step
0753 
0754     @is_workflow_step.setter
0755     def is_workflow_step(self, value):
0756         self._is_workflow_step = value
0757 
0758     @property
0759     def step_name(self):
0760         return self._step_name
0761 
0762     @step_name.setter
0763     def step_name(self, value):
0764         self._step_name = value
0765 
0766     @property
0767     def workflow_data(self):
0768         return self._workflow_data
0769 
0770     @workflow_data.setter
0771     def workflow_data(self, value):
0772         self._workflow_data = value
0773 
0774     @property
0775     def campaign(self):
0776         return self._campaign
0777 
0778     @campaign.setter
0779     def campaign(self, value):
0780         self._campaign = value
0781 
0782     @property
0783     def campaign_scope(self):
0784         return self._campaign_scope
0785 
0786     @campaign_scope.setter
0787     def campaign_scope(self, value):
0788         self._campaign_scope = value
0789 
0790     @property
0791     def campaign_group(self):
0792         return self._campaign_group
0793 
0794     @campaign_group.setter
0795     def campaign_group(self, value):
0796         self._campaign_group = value
0797 
0798     @property
0799     def campaign_tag(self):
0800         return self._campaign_tag
0801 
0802     @campaign_tag.setter
0803     def campaign_tag(self, value):
0804         self._campaign_tag = value
0805 
0806     @property
0807     def request_cache(self):
0808         return self._request_cache
0809 
0810     @request_cache.setter
0811     def request_cache(self, value):
0812         self._request_cache = value
0813 
0814     def set_request_cache(self, value):
0815         self.request_cache = value
0816 
0817     @property
0818     def max_processing_requests(self):
0819         return self._max_processing_requests
0820 
0821     @max_processing_requests.setter
0822     def max_processing_requests(self, value):
0823         self._max_processing_requests = value
0824 
0825     def get_template_work_id(self):
0826         return self.template_work_id
0827 
0828     def get_template_id(self):
0829         return self.template_work_id
0830 
0831     @property
0832     def workload_id(self):
0833         return self.get_metadata_item('workload_id')
0834 
0835     @workload_id.setter
0836     def workload_id(self, value):
0837         self.add_metadata_item('workload_id', value)
0838 
0839     @property
0840     def lifetime(self):
0841         # return self.get_metadata_item('lifetime', None)
0842         return getattr(self, '_lifetime', None)
0843 
0844     @lifetime.setter
0845     def lifetime(self, value):
0846         # self.add_metadata_item('lifetime', value)
0847         self._lifetime = value
0848 
0849     @property
0850     def pending_time(self):
0851         # return self.get_metadata_item('pending_time', None)
0852         return getattr(self, '_pending_time', None)
0853 
0854     @pending_time.setter
0855     def pending_time(self, value):
0856         # self.add_metadata_item('pending_time', value)
0857         self._pending_time = value
0858 
0859     @property
0860     def last_updated_at(self):
0861         last_updated_at = self.get_metadata_item('last_updated_at', None)
0862         if last_updated_at and type(last_updated_at) in [str]:
0863             last_updated_at = str_to_date(last_updated_at)
0864         return last_updated_at
0865 
0866     @last_updated_at.setter
0867     def last_updated_at(self, value):
0868         self.add_metadata_item('last_updated_at', value)
0869 
0870     def has_new_updates(self):
0871         self.last_updated_at = datetime.datetime.utcnow()
0872 
0873     @property
0874     def expired(self):
0875         t = self.get_metadata_item('expired', False)
0876         if type(t) in [bool]:
0877             return t
0878         elif type(t) in [str] and t.lower() in ['true']:
0879             return True
0880         else:
0881             return False
0882 
0883     @expired.setter
0884     def expired(self, value):
0885         self.add_metadata_item('expired', value)
0886 
0887     @property
0888     def works(self):
0889         return self._works
0890 
0891     @works.setter
0892     def works(self, value):
0893         self._works = value
0894         work_metadata = {}
0895         if self._works:
0896             for k in self._works:
0897                 work = self._works[k]
0898                 if isinstance(work, Workflow):
0899                     work_metadata[k] = {'type': 'workflow',
0900                                         'metadata': work.metadata}
0901                 else:
0902                     work_metadata[k] = {'type': 'work',
0903                                         'work_id': work.work_id,
0904                                         'workload_id': work.workload_id,
0905                                         'external_id': work.external_id,
0906                                         'status': work.status.value if work.status else work.status,
0907                                         'substatus': work.substatus.value if work.substatus else work.substatus,
0908                                         'next_works': work.next_works,
0909                                         'signature': work.signature,
0910                                         'transforming': work.transforming}
0911         self.add_metadata_item('works', work_metadata)
0912 
0913     def refresh_works(self, clean=False):
0914         work_metadata = {}
0915         if clean:
0916             self.submitting_works = []
0917             for cond_id in self.conditions:
0918                 cond = self.conditions[cond_id]
0919                 cond.clean()
0920 
0921         if self._works:
0922             for k in self._works:
0923                 work = self._works[k]
0924                 if isinstance(work, Workflow):
0925                     work.refresh_works()
0926                     work_metadata[k] = {'type': 'workflow',
0927                                         'metadata': work.metadata}
0928                 else:
0929                     work_metadata[k] = {'type': 'work',
0930                                         'work_id': work.work_id,
0931                                         'workload_id': work.workload_id,
0932                                         'external_id': work.external_id,
0933                                         'status': work.status.value if work.status else work.status,
0934                                         'substatus': work.substatus.value if work.substatus else work.substatus,
0935                                         'next_works': work.next_works,
0936                                         'signature': work.signature,
0937                                         'transforming': work.transforming}
0938                 if work.last_updated_at and (not self.last_updated_at or work.last_updated_at > self.last_updated_at):
0939                     self.last_updated_at = work.last_updated_at
0940         self.add_metadata_item('works', work_metadata)
0941 
0942     def load_works(self):
0943         work_metadata = self.get_metadata_item('works', {})
0944         for k in self._works:
0945             if k in work_metadata:
0946                 if work_metadata[k]['type'] == 'work':
0947                     self._works[k].work_id = work_metadata[k]['work_id']
0948                     self._works[k].workload_id = work_metadata[k]['workload_id'] if 'workload_id' in work_metadata[k] else None
0949                     self._works[k].external_id = work_metadata[k]['external_id'] if 'external_id' in work_metadata[k] else None
0950                     self._works[k].transforming = work_metadata[k]['transforming']
0951                     if 'signature' in work_metadata[k]:
0952                         self._works[k].signature = work_metadata[k]['signature']
0953                     self._works[k].status = WorkStatus(work_metadata[k]['status']) if work_metadata[k]['status'] else work_metadata[k]['status']
0954                     self._works[k].substatus = WorkStatus(work_metadata[k]['substatus']) if work_metadata[k]['substatus'] else work_metadata[k]['substatus']
0955                     self._works[k].next_works = work_metadata[k]['next_works'] if 'next_works' in work_metadata[k] else []
0956                 elif work_metadata[k]['type'] == 'workflow':
0957                     self._works[k].metadata = work_metadata[k]['metadata']
0958 
0959             work = self._works[k]
0960             if work.last_updated_at and (not self.last_updated_at or work.last_updated_at > self.last_updated_at):
0961                 self.last_updated_at = work.last_updated_at
0962 
0963     @property
0964     def next_works(self):
0965         return self.get_metadata_item('next_works', [])
0966 
0967     @next_works.setter
0968     def next_works(self, value):
0969         self.add_metadata_item('next_works', value)
0970 
0971     @property
0972     def conditions(self):
0973         return self._conditions
0974 
0975     @conditions.setter
0976     def conditions(self, value):
0977         self._conditions = value
0978         conditions_metadata = {}
0979         if self._conditions:
0980             for k in self._conditions:
0981                 conditions_metadata[k] = self._conditions[k].metadata
0982         self.add_metadata_item('conditions', conditions_metadata)
0983 
0984     @property
0985     def work_conds(self):
0986         return self._work_conds
0987 
0988     @work_conds.setter
0989     def work_conds(self, value):
0990         self._work_conds = value
0991         # self.add_metadata_item('work_conds', value)
0992 
0993     def load_work_conditions(self):
0994         conditions_metadata = self.get_metadata_item('conditions', {})
0995         # print("self._conditions: %s" % str(self._conditions))
0996         # print("conditions_metadata: %s" % conditions_metadata)
0997         for cond_internal_id in self._conditions:
0998             if cond_internal_id in conditions_metadata:
0999                 self.conditions[cond_internal_id].metadata = conditions_metadata[cond_internal_id]
1000             if isinstance(self.conditions[cond_internal_id], Workflow):
1001                 # self.conditions[cond_internal_id].load_conditions(self.works)
1002                 pass
1003             elif isinstance(self.conditions[cond_internal_id], Work):
1004                 # self.conditions[cond_internal_id].load_conditions(self.works)
1005                 pass
1006             elif isinstance(self.conditions[cond_internal_id], CompositeCondition):
1007                 self.conditions[cond_internal_id].load_conditions(self.works)
1008                 pass
1009         # work_conds = self.get_metadata_item('work_conds', {})
1010         # self._work_conds = work_conds
1011 
1012     @property
1013     def global_parameters(self):
1014         self._global_parameters = self.get_metadata_item('gp', {})
1015         return self._global_parameters
1016 
1017     @global_parameters.setter
1018     def global_parameters(self, value):
1019         self._global_parameters = value
1020         gp_metadata = {}
1021         if self._global_parameters:
1022             for key in self._global_parameters:
1023                 if key.startswith("user_"):
1024                     gp_metadata[key] = self._global_parameters[key]
1025                 else:
1026                     self.logger.warn("Only parameters start with 'user_' can be set as global parameters. The parameter '%s' will be ignored." % (key))
1027         self.add_metadata_item('gp', gp_metadata)
1028 
1029     def set_global_parameters(self, value):
1030         self.global_parameters = value
1031 
1032     @property
1033     def sliced_global_parameters(self):
1034         self._sliced_global_parameters = self.get_metadata_item('sliced_gp', {})
1035         return self._sliced_global_parameters
1036 
1037     @sliced_global_parameters.setter
1038     def sliced_global_parameters(self, value):
1039         self._sliced_global_parameters = value
1040         gp_metadata = {}
1041         if self._sliced_global_parameters:
1042             for key in self._sliced_global_parameters:
1043                 if key.startswith("user_"):
1044                     gp_metadata[key] = self._sliced_global_parameters[key]
1045                 else:
1046                     self.logger.warn("Only parameters start with 'user_' can be set as global parameters. The parameter '%s' will be ignored." % (key))
1047         self.add_metadata_item('sliced_gp', gp_metadata)
1048 
1049     def set_sliced_global_parameters(self, source, name=None, index=0):
1050         sliced_global_parameters = self.sliced_global_parameters
1051         sliced_global_parameters[source] = {'name': name, 'index': index}
1052         # to trigger the setter function
1053         self.sliced_global_parameters = sliced_global_parameters
1054 
1055     def sync_global_parameters(self, global_parameters, sliced_global_parameters=None):
1056         gp = self.global_parameters
1057         for key in global_parameters:
1058             gp[key] = global_parameters[key]
1059         self.global_parameters = gp
1060 
1061     def sync_global_parameters_from_work(self, work):
1062         self.log_debug("work %s (%s) is_terminated, global_parameters: %s" % (work.get_internal_id(), str(work.metadata),
1063                                                                               str(self.global_parameters)))
1064         if isinstance(work, Work):
1065             if self.global_parameters:
1066                 for key in self.global_parameters:
1067                     status, value = work.get_global_parameter_from_output_data(key)
1068                     self.log_debug("work %s get_global_parameter_from_output_data(key: %s) results(%s:%s)" % (work.get_internal_id(), key, status, value))
1069                     if status:
1070                         self.global_parameters[key] = value
1071                     elif hasattr(work, key):
1072                         self.global_parameters[key] = getattr(work, key)
1073             self.set_global_parameters(self.global_parameters)
1074 
1075     @property
1076     def loop_condition(self):
1077         return self._loop_condition
1078 
1079     @loop_condition.setter
1080     def loop_condition(self, value):
1081         # self._loop_condition_position = position
1082         self._loop_condition = value
1083         if self._loop_condition:
1084             self.add_metadata_item('loop_condition', self._loop_condition.get_condition_status())
1085 
1086     @property
1087     def work_sequence(self):
1088         return self.get_metadata_item('work_sequence', {})
1089 
1090     @work_sequence.setter
1091     def work_sequence(self, value):
1092         self.add_metadata_item('work_sequence', value)
1093 
1094     @property
1095     def terminated_works(self):
1096         return self.get_metadata_item('terminated_works', [])
1097 
1098     @terminated_works.setter
1099     def terminated_works(self, value):
1100         self.add_metadata_item('terminated_works', value)
1101 
1102     @property
1103     def first_initial(self):
1104         return self.get_metadata_item('first_initial', False)
1105 
1106     @first_initial.setter
1107     def first_initial(self, value):
1108         self.add_metadata_item('first_initial', value)
1109 
1110     @property
1111     def internal_id_relation_map(self):
1112         return self.get_metadata_item('internal_id_relation_map', False)
1113 
1114     @internal_id_relation_map.setter
1115     def internal_id_relation_map(self, value):
1116         self.add_metadata_item('internal_id_relation_map', value)
1117 
1118     @property
1119     def to_start_works(self):
1120         return self.get_metadata_item('to_start_works', [])
1121 
1122     @to_start_works.setter
1123     def to_start_works(self, value):
1124         self.add_metadata_item('to_start_works', value)
1125 
1126     @property
1127     def new_to_run_works(self):
1128         return self.get_metadata_item('new_to_run_works', [])
1129 
1130     @new_to_run_works.setter
1131     def new_to_run_works(self, value):
1132         self.add_metadata_item('new_to_run_works', value)
1133 
1134     @property
1135     def submitting_works(self):
1136         return self.get_metadata_item('submitting_works', [])
1137 
1138     @submitting_works.setter
1139     def submitting_works(self, value):
1140         self.add_metadata_item('submitting_works', value)
1141 
1142     @property
1143     def current_running_works(self):
1144         return self.get_metadata_item('current_running_works', [])
1145 
1146     @current_running_works.setter
1147     def current_running_works(self, value):
1148         self.add_metadata_item('current_running_works', value)
1149 
1150     @property
1151     def num_subfinished_works(self):
1152         return self.get_metadata_item('num_subfinished_works', 0)
1153 
1154     @num_subfinished_works.setter
1155     def num_subfinished_works(self, value):
1156         self.add_metadata_item('num_subfinished_works', value)
1157 
1158     @property
1159     def num_finished_works(self):
1160         return self.get_metadata_item('num_finished_works', 0)
1161 
1162     @num_finished_works.setter
1163     def num_finished_works(self, value):
1164         self.add_metadata_item('num_finished_works', value)
1165 
1166     @property
1167     def num_failed_works(self):
1168         return self.get_metadata_item('num_failed_works', 0)
1169 
1170     @num_failed_works.setter
1171     def num_failed_works(self, value):
1172         self.add_metadata_item('num_failed_works', value)
1173 
1174     @property
1175     def num_cancelled_works(self):
1176         return self.get_metadata_item('num_cancelled_works', 0)
1177 
1178     @num_cancelled_works.setter
1179     def num_cancelled_works(self, value):
1180         self.add_metadata_item('num_cancelled_works', value)
1181 
1182     @property
1183     def num_suspended_works(self):
1184         return self.get_metadata_item('num_suspended_works', 0)
1185 
1186     @num_suspended_works.setter
1187     def num_suspended_works(self, value):
1188         self.add_metadata_item('num_suspended_works', value)
1189 
1190     @property
1191     def num_expired_works(self):
1192         return self.get_metadata_item('num_expired_works', 0)
1193 
1194     @num_expired_works.setter
1195     def num_expired_works(self, value):
1196         self.add_metadata_item('num_expired_works', value)
1197 
1198     @property
1199     def num_total_works(self):
1200         return self.get_metadata_item('num_total_works', 0)
1201 
1202     @num_total_works.setter
1203     def num_total_works(self, value):
1204         self.add_metadata_item('num_total_works', value)
1205 
1206     @property
1207     def last_work(self):
1208         return self.get_metadata_item('last_work', None)
1209 
1210     @last_work.setter
1211     def last_work(self, value):
1212         self.add_metadata_item('last_work', value)
1213 
1214     @property
1215     def init_works(self):
1216         return self.get_metadata_item('init_works', [])
1217 
1218     @init_works.setter
1219     def init_works(self, value):
1220         self.add_metadata_item('init_works', value)
1221 
1222     @property
1223     def to_update_transforms(self):
1224         return self.get_metadata_item('to_update_transforms', {})
1225 
1226     @to_update_transforms.setter
1227     def to_update_transforms(self, value):
1228         self.add_metadata_item('to_update_transforms', value)
1229 
1230     @property
1231     def num_run(self):
1232         return self.get_metadata_item('num_run', 0)
1233 
1234     @num_run.setter
1235     def num_run(self, value):
1236         self.add_metadata_item('num_run', value)
1237 
1238     @property
1239     def parent_num_run(self):
1240         return self.get_metadata_item('parent_num_run', 0)
1241 
1242     @parent_num_run.setter
1243     def parent_num_run(self, value):
1244         self.add_metadata_item('parent_num_run', value)
1245 
1246     @property
1247     def to_cancel(self):
1248         return self.get_metadata_item('to_cancel', False)
1249 
1250     @to_cancel.setter
1251     def to_cancel(self, value):
1252         self.add_metadata_item('to_cancel', value)
1253 
1254     def is_with_steps(self):
1255         return self.with_steps
1256 
1257     def set_additional_data_storage(self, storage):
1258         self.additional_data_storage = storage
1259 
1260     def get_additional_data_storage(self):
1261         return self.additional_data_storage
1262 
1263     def create_workflow_step(self, batches):
1264         workflow = Workflow()
1265         workflow.set_internal_id(self.get_internal_id())
1266         workflow.campaign = self.campaign
1267         workflow.campaign_scope = self.campaign_scope
1268         workflow.campaign_group = self.campaign_group
1269         workflow.campaign_tag = self.campaign_tag
1270         workflow.max_processing_requests = self.max_processing_requests
1271         workflow.name = self.name
1272         workflow.username = self.username
1273         workflow.userdn = self.userdn
1274         workflow.lifetime = self.lifetime
1275         workflow.workload_id = self.get_workload_id()
1276         workflow.is_workflow_step = True
1277 
1278         data_batches = {}
1279         step_names = []
1280         for name, data_file in batches:
1281             filename = data_file['filename']
1282             with open(filename, 'r') as f:
1283                 data = json.load(f)
1284             data_batches[name] = data
1285             step_names.append(name)
1286         zip_data = self.zip_data(data_batches)
1287         workflow.step_name = ",".join(step_names)
1288         workflow.workflow_data = zip_data
1289 
1290         return workflow
1291 
1292     def split_workflow_to_steps(self, request_cache=None, max_request_length=None):
1293         workflow = self
1294         data_length = len(json_dumps(workflow))
1295         if request_cache and max_request_length and data_length > max_request_length:
1296             logging.info(f"Workflow size {data_length} > max request length {max_request_length}, will split it into steps")
1297             storage = os.path.join(request_cache, self.get_internal_id())
1298             if not os.path.exists(storage):
1299                 os.makedirs(storage, exist_ok=True)
1300             storage_name = "IDDS_WORKFLOW_ADDITIONAL_STORAGE"
1301             # workflow.set_additional_data_storage(storage)
1302             workflow.set_additional_data_storage(storage_name)
1303             data_files = workflow.convert_data_to_additional_data_storage(storage, storage_name=storage_name)
1304             current_size = 0
1305             current_batch = []
1306             workflow_steps = []
1307             for work_name, data_file in data_files.items():
1308                 # filename = data_file['filename']
1309                 # size = data_file['size']
1310                 zip_size = data_file['zip_size']
1311                 if current_size + zip_size <= max_request_length:
1312                     current_batch.append((work_name, data_file))
1313                     current_size += zip_size
1314                 else:
1315                     if current_batch:
1316                         workflow_step = self.create_workflow_step(current_batch)
1317                         workflow_steps.append(workflow_step)
1318                     current_batch = [(work_name, data_file)]
1319                     current_size = zip_size
1320             if current_batch:
1321                 workflow_step = self.create_workflow_step(current_batch)
1322                 workflow_steps.append(workflow_step)
1323 
1324             self.with_steps = True
1325 
1326             return workflow_steps
1327         else:
1328             logging.info(f"Workflow size {data_length} <= max request length {max_request_length} or max request length is not defined, will not split it into steps")
1329         return []
1330 
1331     def refresh(self):
1332         self.refresh_works()
1333 
1334     def load_metadata(self):
1335         # print("load_metadata")
1336         # print(self.__dict__)
1337         # print("%s loadmetadata" % self.get_internal_id())
1338         self.load_works()
1339         self.load_work_conditions()
1340         self.load_parameter_links()
1341 
1342     def get_class_name(self):
1343         return self.__class__.__name__
1344 
1345     def setup_logger(self):
1346         """
1347         Setup logger
1348         """
1349         self.logger = logging.getLogger(self.get_class_name())
1350 
1351     def log_info(self, info):
1352         if self.logger is None:
1353             self.setup_logger()
1354         self.logger.info(info)
1355 
1356     def log_debug(self, info):
1357         if self.logger is None:
1358             self.setup_logger()
1359         self.logger.debug(info)
1360 
1361     def set_internal_id(self, value):
1362         self.internal_id = value
1363 
1364     def get_internal_id(self):
1365         return self.internal_id
1366 
1367     def copy(self):
1368         new_wf = copy.deepcopy(self)
1369         return new_wf
1370 
1371     def __deepcopy__(self, memo):
1372         logger = self.logger
1373         self.logger = None
1374 
1375         cls = self.__class__
1376         result = cls.__new__(cls)
1377 
1378         memo[id(self)] = result
1379 
1380         # Deep copy all other attributes
1381         for k, v in self.__dict__.items():
1382             setattr(result, k, copy.deepcopy(v, memo))
1383 
1384         self.logger = logger
1385         result.logger = logger
1386         return result
1387 
1388     def get_works(self):
1389         return self.works
1390 
1391     def get_combined_num_run(self):
1392         if self.parent_num_run and len(str(self.parent_num_run)):
1393             if self.num_run and len(str(self.num_run)):
1394                 return str(self.parent_num_run) + "_" + str(self.num_run)
1395             else:
1396                 return str(self.parent_num_run)
1397         else:
1398             if self.num_run and len(str(self.num_run)):
1399                 return str(self.num_run)
1400             else:
1401                 return ""
1402 
1403     def get_new_work_to_run(self, work_id, new_parameters=None):
1404         # 1. initialize works
1405         # template_id = work.get_template_id()
1406         work = self.works[work_id]
1407         work.workload_id = None
1408 
1409         if isinstance(work, Workflow):
1410             work.parent_num_run = self.get_combined_num_run()
1411             work.sync_global_parameters(self.global_parameters, self.sliced_global_parameters)
1412             work.sync_works(to_cancel=self.to_cancel)
1413 
1414             work.sequence_id = self.num_total_works
1415 
1416             works = self.works
1417             self.works = works
1418             # self.work_sequence.append(new_work.get_internal_id())
1419             self.work_sequence[str(self.num_total_works)] = work.get_internal_id()
1420             self.num_total_works += 1
1421             # self.new_to_run_works.append(work.get_internal_id())
1422             # work.transforming = True
1423             self.current_running_works.append(work.get_internal_id())
1424             self.last_work = work.get_internal_id()
1425         else:
1426             if work.get_internal_id() not in self.new_to_run_works:
1427                 new_parameters = self.get_destination_parameters(work_id)
1428                 if new_parameters:
1429                     work.set_parameters(new_parameters)
1430                 work.sequence_id = self.num_total_works
1431 
1432                 work.num_run = self.get_combined_num_run()
1433                 work.initialize_work()
1434                 work.sync_global_parameters(self.global_parameters, self.sliced_global_parameters)
1435                 work.renew_parameters_from_attributes()
1436                 if work.parent_workload_id is None and self.num_total_works > 0:
1437                     last_work_id = self.work_sequence[str(self.num_total_works - 1)]
1438                     last_work = self.works[last_work_id]
1439                     if isinstance(last_work, Work):
1440                         work.parent_workload_id = last_work.workload_id
1441                     last_work.add_next_work(work.get_internal_id())
1442                 works = self.works
1443                 self.works = works
1444                 # self.work_sequence.append(new_work.get_internal_id())
1445                 self.work_sequence[str(self.num_total_works)] = work.get_internal_id()
1446                 self.num_total_works += 1
1447                 self.new_to_run_works.append(work.get_internal_id())
1448                 self.last_work = work.get_internal_id()
1449 
1450         return work
1451 
1452     def get_new_parameters_for_work(self, work):
1453         new_parameters = self.get_destination_parameters(work.get_internal_id())
1454         if new_parameters:
1455             work.set_parameters(new_parameters)
1456         work.sequence_id = self.num_total_works
1457 
1458         work.initialize_work()
1459         work.sync_global_parameters(self.global_parameters, self.sliced_global_parameters)
1460         work.renew_parameters_from_attributes()
1461         works = self.works
1462         self.works = works
1463         return work
1464 
1465     def register_user_defined_condition(self, condition):
1466         cond_src = inspect.getsource(condition)
1467         self.user_defined_conditions[condition.__name__] = cond_src
1468 
1469     def load_user_defined_condition(self):
1470         # try:
1471         #     Condition()
1472         # except NameError:
1473         #     global Condition
1474         #     import Condition
1475 
1476         for cond_src_name in self.user_defined_conditions:
1477             # global cond_src_name
1478             exec(self.user_defined_conditions[cond_src_name])
1479 
1480     def set_workload_id(self, workload_id):
1481         self.workload_id = workload_id
1482 
1483     def get_workload_id(self):
1484         return self.workload_id
1485 
1486     def get_site(self):
1487         try:
1488             work_id = self.primary_initial_work
1489             if not work_id:
1490                 work_id = list(self.works.keys())[0]
1491             work = self.works[work_id]
1492             return work.get_site()
1493         except Exception:
1494             pass
1495         return None
1496 
1497     def get_cloud(self):
1498         try:
1499             work_id = self.primary_initial_work
1500             if not work_id:
1501                 work_id = list(self.works.keys())[0]
1502             work = self.works[work_id]
1503             return work.get_cloud()
1504         except Exception:
1505             pass
1506         return None
1507 
1508     def get_queue(self):
1509         try:
1510             work_id = self.primary_initial_work
1511             if not work_id:
1512                 work_id = list(self.works.keys())[0]
1513             work = self.works[work_id]
1514             return work.get_queue()
1515         except Exception:
1516             pass
1517         return None
1518 
1519     def add_initial_works(self, work):
1520         self.initial_works.append(work.get_internal_id())
1521         if self.primary_initial_work is None:
1522             self.primary_initial_work = work.get_internal_id()
1523 
1524     def add_work(self, work, initial=False, primary=False):
1525         self.first_initial = False
1526         self.works[work.get_internal_id()] = work
1527         if initial:
1528             if primary:
1529                 self.primary_initial_work = work.get_internal_id()
1530             self.add_initial_works(work)
1531 
1532         self.independent_works.append(work.get_internal_id())
1533 
1534     def add_build_work(self, work, initial=False, primary=False):
1535         self.build_work = work
1536         self.build_work.set_build_work()
1537 
1538     def convert_data_to_additional_data_storage(self, storage, storage_name=None, replace_storage_name=False):
1539         data_files = {}
1540         for work_id in self.works.keys():
1541             work = self.works[work_id]
1542             data_file = work.convert_data_to_additional_data_storage(storage, storage_name=storage_name, replace_storage_name=replace_storage_name)
1543             if data_file:
1544                 work_name, filename, size, zip_size = data_file
1545                 data_files[work_name] = {'filename': filename, 'size': size, 'zip_size': zip_size}
1546             self.works[work_id] = work
1547         return data_files
1548 
1549     def get_build_work(self):
1550         return self.build_work
1551 
1552     def has_to_build_work(self):
1553         if self.build_work is not None:
1554             return True
1555         return False
1556 
1557     def add_condition(self, cond):
1558         self.first_initial = False
1559         cond_works = cond.all_works()
1560         for cond_work in cond_works:
1561             assert (cond_work.get_internal_id() in self.get_works())
1562 
1563         conditions = self.conditions
1564         conditions[cond.get_internal_id()] = cond
1565         self.conditions = conditions
1566 
1567         # if cond.current_work not in self.work_conds:
1568         #     self.work_conds[cond.current_work] = []
1569         # self.work_conds[cond.current_work].append(cond)
1570         work_conds = self.work_conds
1571         for work in cond.all_pre_works():
1572             if work.get_internal_id() not in work_conds:
1573                 work_conds[work.get_internal_id()] = []
1574             work_conds[work.get_internal_id()].append(cond.get_internal_id())
1575         self.work_conds = work_conds
1576 
1577         # if a work is a true_work or false_work of a condition,
1578         # should remove it from independent_works
1579         cond_next_works = cond.all_next_works()
1580         for next_work in cond_next_works:
1581             if next_work.get_internal_id() in self.independent_works:
1582                 self.independent_works.remove(next_work.get_internal_id())
1583 
1584     def find_workflow_from_work(self, work):
1585         if work.get_internal_id() in self._works:
1586             return self
1587         else:
1588             for k in self._works:
1589                 wk = self._works[k]
1590                 if isinstance(wk, Workflow):
1591                     wf = wk.find_workflow_from_work(work)
1592                     if wf:
1593                         return wf
1594         return None
1595 
1596     def add_parameter_link(self, work_source, work_destinations, parameter_link):
1597         wf_s = self.find_workflow_from_work(work_source)
1598         if not wf_s:
1599             raise Exception("Cannot find work %s in the workflow." % work_source.get_internal_id())
1600         if work_source.get_internal_id() not in wf_s.parameter_links_source:
1601             wf_s.parameter_links_source[work_source.get_internal_id()] = []
1602         wf_s.parameter_links_source[work_source.get_internal_id()].append(parameter_link.get_internal_id())
1603 
1604         if type(work_destinations) not in [list, tuple]:
1605             work_destinations = [work_destinations]
1606         for work_destination in work_destinations:
1607             wf = self.find_workflow_from_work(work_destination)
1608             if not wf:
1609                 raise Exception("Cannot find work %s in the workflow." % work_destination.get_internal_id())
1610             if parameter_link.get_internal_id() not in wf.parameter_links:
1611                 wf.parameter_links[parameter_link.get_internal_id()] = parameter_link
1612                 if work_destination.get_internal_id() not in wf.parameter_links_destination:
1613                     wf.parameter_links_destination[work_destination.get_internal_id()] = []
1614                 wf.parameter_links_destination[work_destination.get_internal_id()].append(parameter_link.get_internal_id())
1615 
1616     def find_parameter_links_from_id(self, internal_id):
1617         rets = []
1618         if internal_id in self.parameter_links:
1619             rets.append((self, self.parameter_links[internal_id]))
1620         for k in self._works:
1621             wk = self._works[k]
1622             if isinstance(wk, Workflow):
1623                 links = wk.find_parameter_links_from_id(internal_id)
1624                 rets = rets + links
1625         return rets
1626 
1627     def refresh_parameter_links(self):
1628         p_metadata = {}
1629         for internal_id in self.parameter_links:
1630             p_metadata[internal_id] = self.parameter_links[internal_id].metadata
1631         self.add_metadata_item('parameter_links', p_metadata)
1632 
1633     def get_parameter_links_metadata(self):
1634         p_metadata = {}
1635         for internal_id in self.parameter_links:
1636             p_metadata[internal_id] = self.parameter_links[internal_id].metadata
1637         self.add_metadata_item('parameter_links', p_metadata)
1638         return p_metadata
1639 
1640     def set_parameter_links_metadata(self, p_links):
1641         for internal_id in self.parameter_links:
1642             if internal_id in p_links:
1643                 p_metadata = p_links[internal_id]
1644                 self.parameter_links[internal_id].metadata = p_metadata
1645 
1646     def set_source_parameters(self, internal_id):
1647         work = self.works[internal_id]
1648         # if type(work) in [Work]:
1649         #     print(work.work_id)
1650         #     print(internal_id)
1651         #     print(self.parameter_links_source)
1652         if internal_id in self.parameter_links_source:
1653             for p_id in self.parameter_links_source[internal_id]:
1654                 # print(p_id)
1655                 p_links = self.find_parameter_links_from_id(p_id)
1656                 # print(p_links)
1657                 for wf, p_link in p_links:
1658                     p_link.set_parameters(work)
1659                     wf.refresh_parameter_links()
1660 
1661     def get_destination_parameters(self, internal_id):
1662         # work = self.works[internal_id]
1663         parameters = {}
1664         if internal_id in self.parameter_links_destination:
1665             for p_id in self.parameter_links_destination[internal_id]:
1666                 p_link = self.parameter_links[p_id]
1667                 parameters.update(p_link.get_parameters())
1668         return parameters
1669 
1670     def load_parameter_links(self):
1671         p_metadata = self.get_metadata_item('parameter_links', {})
1672         for p_id in self.parameter_links:
1673             if p_id in p_metadata:
1674                 self.parameter_links[p_id].metadata = p_metadata[p_id]
1675 
1676     def add_next_work(self, work_id):
1677         next_works = self.next_works
1678         next_works.append(work_id)
1679         self.next_works = next_works
1680 
1681     def enable_next_works(self, work, cond):
1682         # self.log_debug("works: %s" % str(self.works))
1683         # self.log_debug("Checking Work %s condition: %s" % (work.get_internal_id(),
1684         #                                                    json_dumps(cond, sort_keys=True, indent=4)))
1685         # self.log_debug("Checking Work %s condition: %s" % (work.get_internal_id(), cond.get_internal_id()))
1686 
1687         # load_conditions should cover it.
1688         # if cond and self.is_class_method(cond.cond):
1689         #     # cond_work_id = self.works[cond.cond['idds_method_class_id']]
1690         #     cond.cond = getattr(work, cond.cond['idds_method'])
1691 
1692         self.log_info("Work %s condition: %s" % (work.get_internal_id(), cond.conditions))
1693         next_works = cond.get_next_works(trigger=ConditionTrigger.ToTrigger)
1694         self.log_info("Work %s condition status %s" % (work.get_internal_id(), cond.get_cond_status()))
1695         self.log_info("Work %s next works %s" % (work.get_internal_id(), str(next_works)))
1696         new_next_works = []
1697         if next_works is not None:
1698             for next_work in next_works:
1699                 self.log_info(f"next work {next_work.get_internal_id()} is_submitted: {next_work.submitted}, transforming: {next_work.transforming}")
1700                 # parameters = self.get_destination_parameters(next_work.get_internal_id())
1701                 new_next_work = self.get_new_work_to_run(next_work.get_internal_id())
1702                 work.add_next_work(new_next_work.get_internal_id())
1703                 if isinstance(work, Work):
1704                     new_next_work.parent_workload_id = work.workload_id
1705                 # cond.add_condition_work(new_next_work)   ####### TODO:
1706                 new_next_works.append(new_next_work)
1707             return new_next_works
1708 
1709     def add_loop_condition(self, condition, position='end'):
1710         self.loop_condition_position = position
1711         self.loop_condition = condition
1712 
1713     def has_loop_condition(self):
1714         if self.loop_condition:
1715             return True
1716         return False
1717 
1718     def get_loop_condition_status(self):
1719         if self.has_loop_condition():
1720             self.loop_condition.load_conditions(self.works)
1721             # self.logger.debug("Loop condition %s" % (json_dumps(self.loop_condition, sort_keys=True, indent=4)))
1722             return self.loop_condition.get_condition_status()
1723         return False
1724 
1725     def __str__(self):
1726         return str(json_dumps(self))
1727 
1728     def format_works(self, works):
1729         if not works:
1730             return works
1731         all_works = self.get_all_works(synchronize=False)
1732         task_name_to_internal_id_map = {}
1733         for work in all_works:
1734             task_name_to_internal_id_map[work.task_name] = work.get_internal_id()
1735 
1736         for work in works:
1737             parent_task_names = work.get_ancestry_works()
1738             parent_internal_ids = []
1739             if parent_task_names:
1740                 own_internal_id = work.get_internal_id()
1741                 parent_internal_ids = [task_name_to_internal_id_map[t_name]
1742                                        for t_name in parent_task_names
1743                                        if task_name_to_internal_id_map.get(t_name) != own_internal_id]
1744             work.parent_internal_ids = parent_internal_ids
1745         return works
1746 
1747     def get_new_works(self, synchronize=True):
1748         """
1749         *** Function called by Marshaller agent.
1750 
1751         new works to be ready to start
1752         """
1753         self.logger.info(f"{self.get_internal_id()} get_new_works, to_cancel: {self.to_cancel}")
1754         # if self.to_cancel:
1755         #     return []
1756 
1757         if synchronize:
1758             self.sync_works(to_cancel=self.to_cancel)
1759         works = []
1760 
1761         self.logger.info(f"{self.get_internal_id()} submitting_works: {self.submitting_works}")
1762         if self.submitting_works:
1763             # wait the work to be submitted
1764             return self.format_works(works)
1765 
1766         if self.to_start_works:
1767             self.logger.info("%s to_start_works: %s" % (self.get_internal_id(), str(self.to_start_works)))
1768             to_start_works = self.to_start_works.copy()
1769             init_works = self.init_works
1770             starting_works = []
1771             for work_id in to_start_works:
1772                 # here release all works, then
1773                 # let the transformer check the parent_internal_id
1774                 # if not self.works[work_id].has_dependency():
1775                 #     starting_works.append(work_id)
1776                 starting_works.append(work_id)
1777             if not starting_works:
1778                 work_id = to_start_works.pop(0)
1779                 starting_works.append(work_id)
1780             for work_id in starting_works:
1781                 self.get_new_work_to_run(work_id)
1782                 if not init_works:
1783                     init_works.append(work_id)
1784                     self.init_works = init_works
1785                 if work_id in self.to_start_works:
1786                     self.to_start_works.remove(work_id)
1787             self.logger.info("%s starting_works: %s" % (self.get_internal_id(), str(starting_works)))
1788 
1789         # new_workflows = []
1790         self.logger.info(f"{self.get_internal_id()} new_to_run_works: {self.new_to_run_works}")
1791         for k in self.new_to_run_works:
1792             if isinstance(self.works[k], Work):
1793                 self.works[k] = self.get_new_parameters_for_work(self.works[k])
1794                 works.append(self.works[k])
1795             if isinstance(self.works[k], Workflow):
1796                 works = works + self.works[k].get_new_works(synchronize=False)
1797                 self.works[k].transforming = True
1798                 # self.current_running_works.append(k)
1799                 # new_workflows.append(k)
1800         for k in self.current_running_works:
1801             if isinstance(self.works[k], Workflow):
1802                 works = works + self.works[k].get_new_works(synchronize=False)
1803         # for k in new_workflows:
1804         #     self.current_running_works.append(k)
1805         self.logger.info("%s get_new_works done" % self.get_internal_id())
1806 
1807         # make sure internal_id is loaded
1808         internal_id_relation_map = self.internal_id_relation_map
1809         self.logger.debug(f"internal_id_relation_map: {internal_id_relation_map}")
1810         if internal_id_relation_map and works:
1811             for i, w in enumerate(works):
1812                 if not works[i].parent_internal_id:
1813                     self.logger.debug(f"internal_id {works[i].internal_id} loads parent_internal_id {works[i].parent_internal_id}")
1814                     works[i].parent_internal_id = internal_id_relation_map.get(works[i].internal_id, None)
1815         return self.format_works(works)
1816 
1817     def get_current_works(self):
1818         """
1819         *** Function called by Marshaller agent.
1820 
1821         Current running works
1822         """
1823         self.sync_works(to_cancel=self.to_cancel)
1824         works = []
1825         for k in self.current_running_works:
1826             if isinstance(self.works[k], Work):
1827                 works.append(self.works[k])
1828             if isinstance(self.works[k], Workflow):
1829                 works = works + self.works[k].get_current_works()
1830         return works
1831 
1832     def get_all_works(self, synchronize=True):
1833         """
1834         *** Function called by Marshaller agent.
1835 
1836         Current running works
1837         """
1838         self.logger.info("%s get_all_works" % self.get_internal_id())
1839         if synchronize:
1840             self.sync_works(to_cancel=self.to_cancel)
1841 
1842         works = []
1843         for k in self.works:
1844             if isinstance(self.works[k], Work):
1845                 works.append(self.works[k])
1846             if isinstance(self.works[k], Workflow):
1847                 works = works + self.works[k].get_all_works(synchronize=False)
1848         self.logger.info("%s get_all_works done" % self.get_internal_id())
1849         return works
1850 
1851     def get_primary_initial_collection(self):
1852         """
1853         *** Function called by Clerk agent.
1854         """
1855 
1856         if self.primary_initial_work:
1857             if isinstance(self.get_works()[self.primary_initial_work], Workflow):
1858                 return self.get_works()[self.primary_initial_work].get_primary_initial_collection()
1859             else:
1860                 return self.get_works()[self.primary_initial_work].get_primary_input_collection()
1861         elif self.initial_works:
1862             if isinstance(self.get_works()[self.initial_works[0]], Workflow):
1863                 return self.get_works()[self.initial_works[0]].get_primary_initial_collection()
1864             else:
1865                 return self.get_works()[self.initial_works[0]].get_primary_input_collection()
1866         elif self.independent_works:
1867             if isinstance(self.get_works()[self.independent_works[0]], Workflow):
1868                 return self.get_works()[self.independent_works[0]].get_primary_initial_collection()
1869             else:
1870                 return self.get_works()[self.independent_works[0]].get_primary_input_collection()
1871         else:
1872             keys = self.get_works().keys()
1873             if isinstance(self.get_works()[keys[0]], Workflow):
1874                 return self.get_works()[keys[0]].get_primary_initial_collection()
1875             else:
1876                 return self.get_works()[keys[0]].get_primary_input_collection()
1877         return None
1878 
1879     def get_dependency_works(self, work_id, depth, max_depth):
1880         if depth > max_depth:
1881             return []
1882 
1883         deps = []
1884         for dep_work_id in self.work_dependencies[work_id]:
1885             deps.append(dep_work_id)
1886             l_deps = self.get_dependency_works(dep_work_id, depth + 1, max_depth)
1887             deps += l_deps
1888         deps = list(dict.fromkeys(deps))
1889         return deps
1890 
1891     def order_independent_works(self):
1892         self.log_debug("ordering independent works")
1893         ind_work_ids = self.independent_works
1894         self.log_debug("independent works: %s" % (str(ind_work_ids)))
1895         self.independent_works = []
1896         self.work_dependencies = {}
1897 
1898         all_works = self.get_all_works(synchronize=False)
1899         task_name_to_internal_id_map = {}
1900         for work in all_works:
1901             task_name_to_internal_id_map[work.task_name] = work.get_internal_id()
1902 
1903         for ind_work_id in ind_work_ids:
1904             work = self.works[ind_work_id]
1905             parent_task_names = work.get_ancestry_works()
1906             self.work_dependencies[ind_work_id] = []
1907             if parent_task_names:
1908                 parent_internal_ids = [task_name_to_internal_id_map[t_name] for t_name in parent_task_names]
1909                 self.work_dependencies[ind_work_id] = parent_internal_ids
1910         self.log_debug('work dependencies: %s' % str(self.work_dependencies))
1911 
1912         while True:
1913             # self.log_debug('independent_works N: %s' % str(self.independent_works))
1914             # self.log_debug('work dependencies N: %s' % str(self.work_dependencies))
1915             has_changes = False
1916             for work_id in self.work_dependencies:
1917                 if work_id not in self.independent_works and len(self.work_dependencies[work_id]) == 0:
1918                     self.independent_works.append(work_id)
1919                     has_changes = True
1920             for work_id in self.independent_works:
1921                 if work_id in self.work_dependencies:
1922                     del self.work_dependencies[work_id]
1923                     has_changes = True
1924             for work_id in self.work_dependencies:
1925                 for in_work_id in self.independent_works:
1926                     if in_work_id in self.work_dependencies[work_id]:
1927                         self.work_dependencies[work_id].remove(in_work_id)
1928                         has_changes = True
1929             if not self.work_dependencies:
1930                 break
1931             if not has_changes:
1932                 self.log_debug("There are loop dependencies between works.")
1933                 self.log_debug('independent_works N: %s' % str(self.independent_works))
1934                 self.log_debug('work dependencies N: %s' % str(self.work_dependencies))
1935                 for work_id in self.work_dependencies:
1936                     if work_id not in self.independent_works:
1937                         self.independent_works.append(work_id)
1938                 break
1939         self.log_debug('independent_works: %s' % str(self.independent_works))
1940         self.log_debug("ordered independent works")
1941 
1942         pre_work = None
1943         internal_id_relation_map = {}
1944         for work_id in self.independent_works:
1945             work = self.works[work_id]
1946             if pre_work is not None:
1947                 work.parent_internal_id = pre_work.internal_id
1948             pre_work = work
1949             internal_id_relation_map[work.internal_id] = work.parent_internal_id
1950         self.internal_id_relation_map = internal_id_relation_map
1951 
1952     def first_initialize(self):
1953         # set new_to_run works
1954         if not self.first_initial:
1955             self.log_debug("first initializing")
1956             self.first_initial = True
1957             self.order_independent_works()
1958             if self.initial_works:
1959                 tostart_works = self.initial_works
1960             elif self.independent_works:
1961                 tostart_works = self.independent_works
1962             else:
1963                 tostart_works = list(self.get_works().keys())
1964                 tostart_works = [tostart_works[0]]
1965 
1966             to_start_works = self.to_start_works
1967             for work_id in tostart_works:
1968                 to_start_works.append(work_id)
1969             self.to_start_works = to_start_works
1970             self.log_debug("first initialized")
1971 
1972     def sync_works(self, to_cancel=False):
1973         if to_cancel:
1974             self.to_cancel = to_cancel
1975         self.log_debug("%s num_run %s synchroning works" % (self.get_internal_id(), self.num_run))
1976         self.first_initialize()
1977 
1978         self.refresh_works()
1979 
1980         for k in self.works:
1981             work = self.works[k]
1982             self.log_debug("work %s is_terminated(%s:%s), is_submitted: %s, transforming: %s" % (work.get_internal_id(),
1983                                                                                                  work.is_terminated(synchronize=False),
1984                                                                                                  work.get_status(),
1985                                                                                                  work.submitted,
1986                                                                                                  work.transforming))
1987             if work.get_internal_id() not in self.current_running_works and work.get_status() in [WorkStatus.Transforming]:
1988                 self.current_running_works.append(work.get_internal_id())
1989 
1990         self.log_debug(f"{self.get_internal_id()} current_running_works: {self.current_running_works}")
1991         self.log_debug(f"{self.get_internal_id()} new_to_run_works: {self.new_to_run_works}")
1992         self.log_debug(f"{self.get_internal_id()} submitting_works: {self.submitting_works}")
1993         submitting_works = self.submitting_works
1994         for work in [self.works[k] for k in self.new_to_run_works]:
1995             if work.transforming:
1996                 self.new_to_run_works.remove(work.get_internal_id())
1997                 submitting_works.append(work.get_internal_id())
1998                 if work.get_internal_id() not in self.current_running_works:
1999                     self.current_running_works.append(work.get_internal_id())
2000         self.submitting_works = submitting_works
2001         for work in [self.works[k] for k in self.submitting_works]:
2002             self.log_info("Work %s is_submitted(%s)" % (work.get_internal_id(), work.submitted))
2003             if work.submitted:
2004                 self.submitting_works.remove(work.get_internal_id())
2005 
2006         # for work in [self.works[k] for k in self.current_running_works]:
2007         for k in self.current_running_works:
2008             work = self.works[k]
2009             if isinstance(work, Workflow):
2010                 work.sync_works(to_cancel=self.to_cancel)
2011 
2012             if work.is_terminated(synchronize=False):
2013                 self.log_debug("work %s is_terminated, sync_global_parameters_from_work" % (work.get_internal_id()))
2014                 self.set_source_parameters(work.get_internal_id())
2015                 self.sync_global_parameters_from_work(work)
2016 
2017                 self.log_info("Work %s num_run %s is terminated(%s)" % (work.get_internal_id(), self.num_run, work.get_status()))
2018                 # self.log_debug("Work conditions: %s" % json_dumps(self.work_conds, sort_keys=True, indent=4))
2019                 if work.get_internal_id() not in self.work_conds:
2020                     # has no next work
2021                     self.log_info("Work %s has no condition dependencies" % work.get_internal_id())
2022                     if work.get_internal_id() not in self.terminated_works:
2023                         self.terminated_works.append(work.get_internal_id())
2024                     if work.get_internal_id() in self.current_running_works:
2025                         self.current_running_works.remove(work.get_internal_id())
2026                 else:
2027                     # self.log_debug("Work %s has condition dependencies %s" % (work.get_internal_id(),
2028                     #                                                           json_dumps(self.work_conds[work.get_template_id()], sort_keys=True, indent=4)))
2029                     # for cond in self.work_conds[work.get_template_id()]:
2030                     #     self.enable_next_works(work, cond)
2031                     if work.get_internal_id() not in self.terminated_works:
2032                         self.terminated_works.append(work.get_internal_id())
2033                     if work.get_internal_id() in self.current_running_works:
2034                         self.current_running_works.remove(work.get_internal_id())
2035 
2036         for k in self.work_conds:
2037             work = self.works[k]
2038             # self.log_debug("Work %s has condition dependencies %s" % (work.get_internal_id(),
2039             #                                                           json_dumps(self.work_conds[work.get_internal_id()], sort_keys=True, indent=4)))
2040             self.log_debug("Work %s has condition dependencies %s" % (work.get_internal_id(), self.work_conds[work.get_internal_id()]))
2041             for cond_id in self.work_conds[work.get_internal_id()]:
2042                 cond = self.conditions[cond_id]
2043                 # self.log_debug("Work %s has condition dependencie %s" % (work.get_internal_id(),
2044                 #                                                          json_dumps(cond, sort_keys=True, indent=4)))
2045                 self.log_debug("Work %s has condition dependencie %s" % (work.get_internal_id(), cond.get_internal_id()))
2046 
2047                 self.enable_next_works(work, cond)
2048 
2049         self.num_finished_works = 0
2050         self.num_subfinished_works = 0
2051         self.num_failed_works = 0
2052         self.num_expired_works = 0
2053         self.num_cancelled_works = 0
2054         self.num_suspended_works = 0
2055 
2056         for k in self.works:
2057             work = self.works[k]
2058             if work.is_terminated():
2059                 if work.is_finished(synchronize=False):
2060                     self.num_finished_works += 1
2061                 elif work.is_subfinished(synchronize=False):
2062                     self.num_subfinished_works += 1
2063                 elif work.is_failed(synchronize=False):
2064                     self.num_failed_works += 1
2065                 elif work.is_expired(synchronize=False):
2066                     self.num_expired_works += 1
2067                 elif work.is_cancelled(synchronize=False):
2068                     self.num_cancelled_works += 1
2069                 elif work.is_suspended(synchronize=False):
2070                     self.num_suspended_works += 1
2071 
2072             # if work.is_terminated():
2073             #    # if it's a loop workflow, to generate new loop
2074             #    if isinstance(work, Workflow):
2075             #        work.sync_works()
2076         log_str = "%s num_run %s num_total_works: %s" % (self.get_internal_id(), self.num_run, self.num_total_works)
2077         log_str += ", num_finished_works: %s" % self.num_finished_works
2078         log_str += ", num_subfinished_works: %s" % self.num_subfinished_works
2079         log_str += ", num_failed_works: %s" % self.num_failed_works
2080         log_str += ", num_expired_works: %s" % self.num_expired_works
2081         log_str += ", num_cancelled_works: %s" % self.num_cancelled_works
2082         log_str += ", num_suspended_works: %s" % self.num_suspended_works
2083         log_str += ", new_to_run_works: %s" % len(self.new_to_run_works)
2084         log_str += ", current_running_works: %s" % len(self.current_running_works)
2085         self.log_debug(log_str)
2086 
2087         self.refresh_works()
2088         self.log_debug("%s synchronized works" % self.get_internal_id())
2089 
2090     def resume_works(self):
2091         self.to_cancel = False
2092         self.num_subfinished_works = 0
2093         self.num_finished_works = 0
2094         self.num_failed_works = 0
2095         self.num_cancelled_works = 0
2096         self.num_suspended_works = 0
2097         self.num_expired_works = 0
2098 
2099         self.last_updated_at = datetime.datetime.utcnow()
2100 
2101         t_works = self.terminated_works
2102         self.terminated_works = []
2103         self.current_running_works = self.current_running_works + t_works
2104         for work in [self.works[k] for k in self.current_running_works]:
2105             if isinstance(work, Workflow):
2106                 work.resume_works()
2107             else:
2108                 work.resume_work()
2109 
2110     def get_relation_data(self, work):
2111         ret = {'work': {'workload_id': work.workload_id,
2112                         'external_id': work.external_id,
2113                         'work_name': work.get_work_name()}}
2114         if hasattr(work, 'get_ancestry_works'):
2115             ret['work']['ancestry_works'] = work.get_ancestry_works()
2116 
2117         next_works = work.next_works
2118         if next_works:
2119             next_works_data = []
2120             for next_id in next_works:
2121                 next_work = self.works[next_id]
2122                 if isinstance(next_work, Workflow):
2123                     next_work_data = next_work.get_relation_map()
2124                 else:
2125                     next_work_data = self.get_relation_data(next_work)
2126                 next_works_data.append(next_work_data)
2127             ret['next_works'] = next_works_data
2128         return ret
2129 
2130     def organzie_based_on_ancestry_works(self, works):
2131         new_ret = []
2132 
2133         ordered_items = {}
2134         left_items = []
2135         for item in works:
2136             if type(item) in [dict]:
2137                 if 'ancestry_works' not in item['work'] or not item['work']['ancestry_works']:
2138                     new_ret.append(item)
2139                     ordered_items[item['work']['work_name']] = item
2140                 else:
2141                     # ancestry_works = item['work']['ancestry_works']
2142                     left_items.append(item)
2143             elif type(item) in [list]:
2144                 # subworkflow
2145                 # work_names, ancestry_works = self.get_workflow_ancestry_works(item)
2146                 # if not ancestry_works:
2147                 #     new_ret.append(item)
2148                 # currently now support to use dependency_map to depend_on a workflow.
2149                 # depending on a workflow should use Condition. It's already processed.
2150                 new_ret.append(item)
2151         while True:
2152             new_left_items = left_items
2153             left_items = []
2154             has_updates = False
2155             for item in new_left_items:
2156                 ancestry_works = item['work']['ancestry_works']
2157                 all_ancestry_ready = True
2158                 for work_name in ancestry_works:
2159                     if work_name not in ordered_items and work_name != item['work']['work_name']:
2160                         all_ancestry_ready = False
2161                 if all_ancestry_ready:
2162                     for work_name in ancestry_works:
2163                         if work_name != item['work']['work_name']:
2164                             if 'next_works' not in ordered_items[work_name]:
2165                                 ordered_items[work_name]['next_works'] = [item]
2166                             else:
2167                                 ordered_items[work_name]['next_works'].append(item)
2168                             has_updates = True
2169                             ordered_items[item['work']['work_name']] = item
2170                 else:
2171                     left_items.append(item)
2172             if not has_updates or not left_items:
2173                 break
2174         for item in left_items:
2175             new_ret.append(item)
2176         return new_ret
2177 
2178     def get_relation_map(self):
2179         ret = []
2180         init_works = self.init_works
2181         for internal_id in init_works:
2182             if isinstance(self.works[internal_id], Workflow):
2183                 work_data = self.works[internal_id].get_relation_map()
2184             else:
2185                 work_data = self.get_relation_data(self.works[internal_id])
2186             ret.append(work_data)
2187         ret = self.organzie_based_on_ancestry_works(ret)
2188         return ret
2189 
2190     def clean_works(self):
2191         self.num_subfinished_works = 0
2192         self.num_finished_works = 0
2193         self.num_failed_works = 0
2194         self.num_cancelled_works = 0
2195         self.num_suspended_works = 0
2196         self.num_expired_works = 0
2197         self.num_total_works = 0
2198 
2199         self.last_updated_at = datetime.datetime.utcnow()
2200         if self.build_work:
2201             self.build_work.clean_works()
2202 
2203         self.terminated_works = []
2204         self.current_running_works = []
2205         # self.works = {}
2206         # self.work_sequence = {}  # order list
2207 
2208         self.first_initial = False
2209         self.new_to_run_works = []
2210 
2211     def get_exact_workflows(self):
2212         """
2213         *** Function called by Clerk agent.
2214 
2215         TODO: The primary dataset for the initial work is a dataset with '*'.
2216         workflow.primary_initial_collection = 'some datasets with *'
2217         collections = get_collection(workflow.primary_initial_collection)
2218         wfs = []
2219         for coll in collections:
2220             wf = self.copy()
2221             wf.name = self.name + "_" + number
2222             wf.primary_initial_collection = coll
2223             wfs.append(wf)
2224         return wfs
2225         """
2226         return [self]
2227 
2228     def is_terminated(self, synchronize=True, new=False):
2229         """
2230         *** Function called by Marshaller agent.
2231         """
2232         if synchronize:
2233             self.sync_works(to_cancel=self.to_cancel)
2234         if self.to_cancel:
2235             if new:
2236                 if len(self.new_to_run_works) == 0 and len(self.current_running_works) == 0 and self.num_total_works > 0:
2237                     return True
2238             else:
2239                 if len(self.current_running_works) == 0:
2240                     return True
2241         else:
2242             num_total_works = self.num_finished_works + self.num_subfinished_works + self.num_failed_works
2243             num_total_works += self.num_expired_works + self.num_cancelled_works + self.num_suspended_works
2244             num_total_works += len(self.new_to_run_works) + len(self.current_running_works)
2245             if self.num_total_works > 0 and len(self.new_to_run_works) == 0 and len(self.current_running_works) == 0 and num_total_works == self.num_total_works:
2246                 return True
2247         return False
2248 
2249     def is_finished(self, synchronize=True):
2250         """
2251         *** Function called by Marshaller agent.
2252         """
2253         return self.is_terminated(synchronize=synchronize) and self.num_finished_works == self.num_total_works and (self.num_total_works > 0)
2254 
2255     def is_subfinished(self, synchronize=True):
2256         """
2257         *** Function called by Marshaller agent.
2258         """
2259         return self.is_terminated(synchronize=synchronize) and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works)
2260 
2261     def is_processed(self, synchronize=True):
2262         """
2263         *** Function called by Transformer agent.
2264         """
2265         return self.is_terminated(synchronize=synchronize) and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works)
2266 
2267     def is_failed(self, synchronize=True):
2268         """
2269         *** Function called by Marshaller agent.
2270         """
2271         return self.is_terminated(synchronize=synchronize) and (self.num_failed_works > 0) and (self.num_cancelled_works == 0) and (self.num_suspended_works == 0) and (self.num_expired_works == 0)
2272 
2273     def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
2274         if self.expired:
2275             # it's already expired. avoid sending duplicated messages again and again.
2276             return False
2277         if expired_at:
2278             if type(expired_at) in [str]:
2279                 expired_at = str_to_date(expired_at)
2280             if expired_at < datetime.datetime.utcnow():
2281                 self.logger.info("Request(%s) expired_at(%s) is smaller than utc now(%s), expiring" % (request_id,
2282                                                                                                        expired_at,
2283                                                                                                        datetime.datetime.utcnow()))
2284                 return True
2285 
2286         act_pending_time = None
2287         if self.pending_time:
2288             # in days
2289             act_pending_time = float(self.pending_time)
2290         else:
2291             if pending_time:
2292                 act_pending_time = float(pending_time)
2293         if act_pending_time:
2294             act_pending_seconds = int(86400 * act_pending_time)
2295             if self.last_updated_at + datetime.timedelta(seconds=act_pending_seconds) < datetime.datetime.utcnow():
2296                 log_str = "Request(%s) last updated at(%s) + pending seconds(%s)" % (request_id,
2297                                                                                      self.last_updated_at,
2298                                                                                      act_pending_seconds)
2299                 log_str += " is smaller than utc now(%s), expiring" % (datetime.datetime.utcnow())
2300                 self.logger.info(log_str)
2301                 return True
2302 
2303         return False
2304 
2305     def is_expired(self, synchronize=True):
2306         """
2307         *** Function called by Marshaller agent.
2308         """
2309         # return self.is_terminated() and (self.num_expired_works > 0)
2310         return self.is_terminated(synchronize=synchronize) and self.expired
2311 
2312     def is_cancelled(self, synchronize=True):
2313         """
2314         *** Function called by Marshaller agent.
2315         """
2316         return self.is_terminated(synchronize=synchronize) and (self.num_cancelled_works > 0)
2317 
2318     def is_suspended(self, synchronize=True):
2319         """
2320         *** Function called by Marshaller agent.
2321         """
2322         return self.is_terminated(synchronize=synchronize) and (self.num_suspended_works > 0)
2323 
2324     def get_terminated_msg(self):
2325         """
2326         *** Function called by Marshaller agent.
2327         """
2328         if self.last_work:
2329             return self.works[self.last_work].get_terminated_msg()
2330         return None
2331 
2332     def get_status(self):
2333         if self.is_terminated():
2334             if self.is_finished(synchronize=False):
2335                 return WorkStatus.Finished
2336             elif self.is_subfinished(synchronize=False):
2337                 return WorkStatus.SubFinished
2338             elif self.is_failed(synchronize=False):
2339                 return WorkStatus.Failed
2340             elif self.is_expired(synchronize=False):
2341                 return WorkStatus.Expired
2342             elif self.is_cancelled(synchronize=False):
2343                 return WorkStatus.Cancelled
2344             elif self.is_suspended(synchronize=False):
2345                 return WorkStatus.Suspended
2346         return WorkStatus.Transforming
2347 
2348     def depend_on(self, work):
2349         return False
2350 
2351     def add_proxy(self):
2352         self.proxy = get_proxy()
2353         if not self.proxy:
2354             raise Exception("Cannot get local proxy")
2355 
2356     def get_proxy(self):
2357         return self.proxy
2358 
2359 
2360 class Workflow(Base):
2361     def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
2362         # super(Workflow, self).__init__(name=name, workload_id=workload_id, lifetime=lifetime, pending_time=pending_time, logger=logger)
2363         self.logger = logger
2364         if self.logger is None:
2365             self.setup_logger()
2366 
2367         self.template = WorkflowBase(name=name, workload_id=workload_id, lifetime=lifetime, pending_time=pending_time, logger=logger)
2368 
2369         self.build_work = None
2370         # parent_num_run is string.
2371         self.parent_num_run = None
2372         self._num_run = 0
2373         self.runs = {}
2374         self.loop_condition_position = 'end'
2375 
2376     def setup_logger(self):
2377         # Setup logger
2378         self.logger = logging.getLogger(self.get_class_name())
2379 
2380     def log_info(self, info):
2381         if self.logger is None:
2382             self.setup_logger()
2383         self.logger.info(info)
2384 
2385     def log_debug(self, info):
2386         if self.logger is None:
2387             self.setup_logger()
2388         self.logger.debug(info)
2389 
2390     def __deepcopy__(self, memo):
2391         logger = self.logger
2392         self.logger = None
2393 
2394         cls = self.__class__
2395         result = cls.__new__(cls)
2396 
2397         memo[id(self)] = result
2398 
2399         # Deep copy all other attributes
2400         for k, v in self.__dict__.items():
2401             setattr(result, k, copy.deepcopy(v, memo))
2402 
2403         self.logger = logger
2404         result.logger = logger
2405         return result
2406 
2407     def get_template_id(self):
2408         return self.template.get_template_id()
2409 
2410     def is_with_steps(self):
2411         return self.template.is_with_steps()
2412 
2413     def set_additional_data_storage(self, storage):
2414         self.template.set_additional_data_storage(storage)
2415 
2416     def get_additional_data_storage(self):
2417         return self.template.get_additional_data_storage()
2418 
2419     def convert_data_to_additional_data_storage(self, storage, storage_name=None, replace_storage_name=False):
2420         return self.template.convert_data_to_additional_data_storage(storage, storage_name=storage_name, replace_storage_name=replace_storage_name)
2421 
2422     @property
2423     def is_workflow_step(self):
2424         if self.runs:
2425             return self.runs[str(self.num_run)].is_workflow_step
2426         return self.template.is_workflow_step
2427 
2428     @is_workflow_step.setter
2429     def is_workflow_step(self, value):
2430         if self.runs:
2431             self.runs[str(self.num_run)].is_workflow_step = value
2432         self.template.is_workflow_step = value
2433 
2434     @property
2435     def step_name(self):
2436         if self.runs:
2437             return self.runs[str(self.num_run)].step_name
2438         return self.template.step_name
2439 
2440     @step_name.setter
2441     def step_name(self, value):
2442         if self.runs:
2443             self.runs[str(self.num_run)].step_name = value
2444         self.template.step_name = value
2445 
2446     @property
2447     def workflow_data(self):
2448         if self.runs:
2449             return self.runs[str(self.num_run)].workflow_data
2450         return self.template.workflow_data
2451 
2452     @workflow_data.setter
2453     def workflow_data(self, value):
2454         if self.runs:
2455             self.runs[str(self.num_run)].workflow_data = value
2456         self.template.workflow_data = value
2457 
2458     def split_workflow_to_steps(self, request_cache=None, max_request_length=None):
2459         return self.template.split_workflow_to_steps(request_cache=request_cache, max_request_length=max_request_length)
2460 
2461     @property
2462     def metadata(self):
2463         run_metadata = {'build': self.get_build_metadata(),
2464                         'parent_num_run': self.parent_num_run,
2465                         'num_run': self._num_run,
2466                         'runs': {}}
2467         for run_id in self.runs:
2468             run_metadata['runs'][run_id] = self.runs[run_id].metadata
2469         if not self.runs:
2470             run_metadata['parameter_links'] = self.template.get_parameter_links_metadata()
2471         return run_metadata
2472 
2473     @metadata.setter
2474     def metadata(self, value):
2475         self.template.load_metadata()
2476         run_metadata = value
2477         build_metadata = run_metadata['build'] if 'build' in run_metadata else {}
2478         self.set_build_metadata(build_metadata)
2479         self.parent_num_run = run_metadata['parent_num_run']
2480         self._num_run = run_metadata['num_run']
2481         runs = run_metadata['runs']
2482         if not runs and 'parameter_links' in run_metadata:
2483             parameter_links = run_metadata['parameter_links']
2484             self.template.set_parameter_links_metadata(parameter_links)
2485         for run_id in runs:
2486             self.template.parent_num_run = self.parent_num_run
2487             self.runs[run_id] = self.template.copy()
2488             self.runs[run_id].metadata = runs[run_id]
2489             self.runs[run_id].parent_num_run = self.parent_num_run
2490         # self.add_metadata_item('runs', )
2491 
2492     @property
2493     def independent_works(self):
2494         if self.runs:
2495             return self.runs[str(self.num_run)].independent_works
2496         return self.template.independent_works
2497 
2498     @independent_works.setter
2499     def independent_works(self, value):
2500         if self.runs:
2501             self.runs[str(self.num_run)].independent_works = value
2502         self.template.independent_works = value
2503 
2504     def add_next_work(self, work_id):
2505         if self.runs:
2506             self.runs[str(self.num_run)].add_next_work(work_id)
2507         else:
2508             raise Exception("There are no runs. It should not have next work")
2509 
2510     @property
2511     def last_updated_at(self):
2512         if self.runs:
2513             return self.runs[str(self.num_run)].last_updated_at
2514         return None
2515 
2516     @last_updated_at.setter
2517     def last_updated_at(self, value):
2518         if self.runs:
2519             self.runs[str(self.num_run)].last_updated_at = value
2520 
2521     @property
2522     def name(self):
2523         return self.template.name
2524 
2525     @name.setter
2526     def name(self, value):
2527         self.template.name = value
2528 
2529     @property
2530     def campaign(self):
2531         return self.template.campaign
2532 
2533     @campaign.setter
2534     def campaign(self, value):
2535         self.template.campaign = value
2536 
2537     @property
2538     def campaign_scope(self):
2539         return self.template.campaign_scope
2540 
2541     @campaign_scope.setter
2542     def campaign_scope(self, value):
2543         self.template.campaign_scope = value
2544 
2545     @property
2546     def campaign_group(self):
2547         return self.template._campaign_group
2548 
2549     @campaign_group.setter
2550     def campaign_group(self, value):
2551         self.template._campaign_group = value
2552 
2553     @property
2554     def campaign_tag(self):
2555         return self.template._campaign_tag
2556 
2557     @campaign_tag.setter
2558     def campaign_tag(self, value):
2559         self.template._campaign_tag = value
2560 
2561     @property
2562     def max_processing_requests(self):
2563         return self.template._max_processing_requests
2564 
2565     @max_processing_requests.setter
2566     def max_processing_requests(self, value):
2567         self.template._max_processing_requests = value
2568 
2569     @property
2570     def username(self):
2571         return self.template.username
2572 
2573     @username.setter
2574     def username(self, value):
2575         self.template.username = value
2576 
2577     @property
2578     def userdn(self):
2579         return self.template.userdn
2580 
2581     @userdn.setter
2582     def userdn(self, value):
2583         self.template.userdn = value
2584 
2585     @property
2586     def lifetime(self):
2587         return self.template.lifetime
2588 
2589     @lifetime.setter
2590     def lifetime(self, value):
2591         self.template.lifetime = value
2592 
2593     @property
2594     def to_cancel(self):
2595         return self.template.to_cancel
2596 
2597     @to_cancel.setter
2598     def to_cancel(self, value):
2599         if self.runs:
2600             self.runs[str(self.num_run)].to_cancel = value
2601         self.template.to_cancel = value
2602 
2603     @property
2604     def num_run(self):
2605         return self._num_run
2606 
2607     @num_run.setter
2608     def num_run(self, value):
2609         self._num_run = value
2610 
2611     def get_combined_num_run(self):
2612         if self.parent_num_run:
2613             return str(self.parent_num_run) + "_" + str(self.num_run)
2614         return str(self.num_run)
2615 
2616     @property
2617     def transforming(self):
2618         if self.runs and str(self.num_run) in self.runs:
2619             return True
2620         return False
2621 
2622     @transforming.setter
2623     def transforming(self, value):
2624         if self._num_run < 1:
2625             self._num_run = 1
2626         if str(self.num_run) not in self.runs:
2627             self.template.parent_num_run = self.parent_num_run
2628             self.runs[str(self.num_run)] = self.template.copy()
2629             if self.runs[str(self.num_run)].has_loop_condition():
2630                 self.runs[str(self.num_run)].num_run = self.num_run
2631                 self.runs[str(self.num_run)].parent_num_run = self.parent_num_run
2632 
2633             # self.runs[str(self.num_run)].parent_num_run = self.get_combined_num_run()
2634             if self.runs[str(self.num_run)].has_loop_condition():
2635                 if self._num_run > 1:
2636                     p_metadata = self.runs[str(self.num_run - 1)].get_metadata_item('parameter_links')
2637                     self.runs[str(self.num_run)].add_metadata_item('parameter_links', p_metadata)
2638 
2639     @property
2640     def submitted(self):
2641         return self.transforming
2642 
2643     @submitted.setter
2644     def submitted(self, value):
2645         pass
2646 
2647     def has_dependency(self):
2648         return False
2649 
2650     def set_workload_id(self, workload_id):
2651         if self.runs:
2652             self.runs[str(self.num_run)].workload_id = workload_id
2653         else:
2654             self.template.workload_id = workload_id
2655         # self.dynamic.workload_id = workload_id
2656 
2657     def set_internal_id(self, value):
2658         if self.runs:
2659             return self.runs[str(self.num_run)].set_internal_id(value)
2660         return self.template.set_internal_id(value)
2661 
2662     def get_internal_id(self):
2663         if self.runs:
2664             return self.runs[str(self.num_run)].get_internal_id()
2665         return self.template.get_internal_id()
2666 
2667     def get_workload_id(self):
2668         if self.runs:
2669             return self.runs[str(self.num_run)].workload_id
2670         return self.template.workload_id
2671 
2672     def get_site(self):
2673         return self.template.get_site()
2674 
2675     def get_cloud(self):
2676         return self.template.get_cloud()
2677 
2678     def get_queue(self):
2679         return self.template.get_queue()
2680 
2681     def add_work(self, work, initial=False, primary=False):
2682         self.template.add_work(work, initial, primary)
2683 
2684     def add_build_work(self, work, initial=False, primary=False):
2685         self.template.add_build_work(work, initial, primary)
2686 
2687     def get_build_metadata(self):
2688         if self.build_work is not None:
2689             return self.build_work.metadata
2690 
2691         build_work = self.template.get_build_work()
2692         if build_work is not None:
2693             return build_work.metadata
2694         return None
2695 
2696     def set_build_metadata(self, metadata):
2697         if self.build_work is not None:
2698             self.build_work.metadata = metadata
2699 
2700         build_work = self.template.get_build_work()
2701         if build_work is not None:
2702             self.build_work = self.template.copy()
2703             self.build_work.metadata = metadata
2704 
2705     def get_build_work(self):
2706         if self.build_work is not None:
2707             return self.build_work
2708         build_work = self.template.get_build_work()
2709         if build_work is not None:
2710             self.build_work = self.template.copy()
2711             self.build_work.sign()
2712             return self.build_work
2713 
2714     def has_to_build_work(self):
2715         build_work = self.get_buil_work()
2716         if build_work is None:
2717             return False
2718 
2719         if not (build_work.is_started() or build_work.is_starting()):
2720             return True
2721         return False
2722 
2723     def add_condition(self, cond):
2724         self.template.add_condition(cond)
2725 
2726     def add_parameter_link(self, work_source, work_destinations, parameter_link):
2727         self.template.add_parameter_link(work_source, work_destinations, parameter_link)
2728 
2729     def find_workflow_from_work(self, work):
2730         return self.template.find_workflow_from_work(work)
2731 
2732     def find_parameter_links_from_id(self, internal_id):
2733         if self.runs:
2734             return self.runs[str(self.num_run)].find_parameter_links_from_id(internal_id)
2735         return self.template.find_parameter_links_from_id(internal_id)
2736 
2737     def refresh_parameter_links(self):
2738         if self.runs:
2739             self.runs[str(self.num_run)].refresh_parameter_links()
2740 
2741     def set_global_parameters(self, value):
2742         self.template.set_global_parameters(value)
2743 
2744     def set_sliced_global_parameters(self, source, name=None, index=0):
2745         self.template.set_sliced_global_parameters(source, name=name, index=index)
2746 
2747     def sync_global_parameters_from_work(self, work):
2748         if self.runs:
2749             return self.runs[str(self.num_run)].sync_global_parameters_from_work(work)
2750         return self.template.sync_global_parameters_from_work(work)
2751 
2752     def sync_global_parameters(self, global_parameters, sliced_global_parameters=None):
2753         if self.runs:
2754             return self.runs[str(self.num_run)].sync_global_parameters(global_parameters, sliced_global_parameters)
2755         return self.template.sync_global_parameters(global_parameters, sliced_global_parameters)
2756 
2757     def get_new_works(self, synchronize=True):
2758         self.logger.info("%s get_new_works" % self.get_internal_id())
2759 
2760         build_work = self.get_build_work()
2761         if build_work:
2762             build_work.num_run = 0
2763             if not (build_work.is_started() or build_work.is_starting()):
2764                 return build_work
2765             elif not build_work.is_terminated():
2766                 return []
2767             elif build_work.is_terminated() and not build_work.is_finished():
2768                 return []
2769 
2770         self.log_debug("synchronizing works")
2771         if synchronize:
2772             self.sync_works(to_cancel=self.to_cancel)
2773         self.log_debug("synchronized works")
2774         works = []
2775         self.log_debug("%s num_run: %s" % (self.get_internal_id(), self.num_run))
2776         self.log_debug("%s runs: %s" % (self.get_internal_id(), str(self.runs)))
2777         if self.runs:
2778             # works = self.runs[str(self.num_run)].get_new_works(synchronize=False)
2779             works = self.runs[str(self.num_run)].get_new_works(synchronize=True)
2780             self.logger.info("%s new workers: %s" % (self.get_internal_id(), str(works)))
2781             self.runs[str(self.num_run)].transforming = True
2782             if works:
2783                 for work in works:
2784                     work.num_run = int(self.num_run) if self.num_run is not None else 0
2785         self.logger.info("%s get_new_works done" % self.get_internal_id())
2786         return works
2787 
2788     def get_current_works(self):
2789         build_work = self.get_build_work()
2790         if build_work:
2791             build_work.num_run = 0
2792             if (build_work.is_started() or build_work.is_starting()):
2793                 if (not build_work.is_terminated()):
2794                     return [build_work]
2795                 elif not build_work.is_finished():
2796                     return []
2797             else:
2798                 return []
2799 
2800         self.sync_works(to_cancel=self.to_cancel)
2801         if self.runs:
2802             works = self.runs[str(self.num_run)].get_current_works()
2803             for work in works:
2804                 work.num_run = int(self.num_run) if self.num_run is not None else 0
2805         return []
2806 
2807     def get_all_works(self, synchronize=True):
2808         self.logger.info("%s get_all_works" % self.get_internal_id())
2809         works = []
2810 
2811         build_work = self.get_build_work()
2812         if build_work:
2813             if build_work.is_finished():
2814                 build_work.num_run = 0
2815                 works = [build_work]
2816             else:
2817                 return [build_work]
2818 
2819         if synchronize:
2820             self.sync_works(to_cancel=self.to_cancel)
2821         if self.runs:
2822             run_works = self.runs[str(self.num_run)].get_all_works(synchronize=False)
2823             if run_works:
2824                 for work in run_works:
2825                     work.num_run = int(self.num_run) if self.num_run is not None else 0
2826 
2827             works = works + run_works
2828 
2829         self.logger.info("%s get_all_works done" % self.get_internal_id())
2830         return works
2831 
2832     def get_primary_initial_collection(self):
2833         if self.runs:
2834             return self.runs[str(self.num_run)].get_primary_initial_collection()
2835         return self.template.get_primary_initial_collection()
2836 
2837     def resume_works(self):
2838         if self.runs:
2839             self.runs[str(self.num_run)].resume_works()
2840         self.template.to_cancel = False
2841 
2842     def clean_works(self):
2843         # if self.runs:
2844         #     self.runs[str(self.num_run)].clean_works()
2845         self.template.clean_works()
2846         self.parent_num_run = None
2847         self._num_run = 0
2848         self.runs = {}
2849         self.build_work = None
2850 
2851     def is_to_expire(self, expired_at=None, pending_time=None, request_id=None):
2852         build_work = self.get_build_work()
2853         if build_work:
2854             if not build_work.is_terminated():
2855                 return build_work.is_to_expire(expired_at=expired_at, pending_time=pending_time, request_id=request_id)
2856             elif build_work.is_terminated() and not build_work.is_finished():
2857                 return False
2858             else:
2859                 pass
2860 
2861         if self.runs:
2862             return self.runs[str(self.num_run)].is_to_expire(expired_at=expired_at, pending_time=pending_time, request_id=request_id)
2863         return False
2864 
2865     def is_terminated(self, synchronize=True):
2866         build_work = self.get_build_work()
2867         if build_work:
2868             if build_work.is_terminated():
2869                 if not build_work.is_finished():
2870                     return True
2871                 else:
2872                     pass
2873             else:
2874                 return False
2875 
2876         if self.runs:
2877             if self.runs[str(self.num_run)].is_terminated(synchronize=synchronize):
2878                 if not self.runs[str(self.num_run)].has_loop_condition() or not self.runs[str(self.num_run)].get_loop_condition_status():
2879                     return True
2880         return False
2881 
2882     def is_finished(self, synchronize=True):
2883         if self.is_terminated(synchronize=synchronize):
2884             build_work = self.get_build_work()
2885             if build_work:
2886                 if build_work.is_terminated():
2887                     if not build_work.is_finished():
2888                         return False
2889                     else:
2890                         pass
2891                 else:
2892                     return False
2893             return self.runs[str(self.num_run)].is_finished(synchronize=False)
2894         return False
2895 
2896     def is_subfinished(self, synchronize=True):
2897         if self.is_terminated(synchronize=synchronize):
2898             build_work = self.get_build_work()
2899             if build_work:
2900                 if build_work.is_terminated():
2901                     if not build_work.is_finished():
2902                         return False
2903                     else:
2904                         pass
2905                 else:
2906                     return False
2907             return self.runs[str(self.num_run)].is_subfinished(synchronize=False)
2908         return False
2909 
2910     def is_processed(self, synchronize=True):
2911         if self.is_terminated(synchronize=synchronize):
2912             build_work = self.get_build_work()
2913             if build_work:
2914                 if build_work.is_terminated():
2915                     if not build_work.is_processed():
2916                         return False
2917                     else:
2918                         pass
2919                 else:
2920                     return False
2921             return self.runs[str(self.num_run)].is_processed(synchronize=False)
2922         return False
2923 
2924     def is_failed(self, synchronize=True):
2925         if self.is_terminated(synchronize=synchronize):
2926             build_work = self.get_build_work()
2927             if build_work:
2928                 if build_work.is_terminated():
2929                     if not build_work.is_finished():
2930                         return True
2931                     else:
2932                         pass
2933                 else:
2934                     return False
2935             return self.runs[str(self.num_run)].is_failed(synchronize=False)
2936         return False
2937 
2938     def is_expired(self, synchronize=True):
2939         if self.is_terminated(synchronize=synchronize):
2940             build_work = self.get_build_work()
2941             if build_work:
2942                 if build_work.is_terminated():
2943                     if not build_work.is_finished():
2944                         return False
2945                     else:
2946                         pass
2947                 else:
2948                     if build_work.is_expired():
2949                         return True
2950                     else:
2951                         return False
2952             return self.runs[str(self.num_run)].is_expired(synchronize=False)
2953         return False
2954 
2955     def is_cancelled(self, synchronize=True):
2956         if self.is_terminated(synchronize=synchronize):
2957             build_work = self.get_build_work()
2958             if build_work:
2959                 if build_work.is_terminated():
2960                     if not build_work.is_finished():
2961                         return False
2962                     else:
2963                         pass
2964                 else:
2965                     if build_work.is_cancelled():
2966                         return True
2967                     else:
2968                         return False
2969             return self.runs[str(self.num_run)].is_cancelled(synchronize=False)
2970         return False
2971 
2972     def is_suspended(self, synchronize=True):
2973         if self.is_terminated(synchronize=synchronize):
2974             build_work = self.get_build_work()
2975             if build_work:
2976                 if build_work.is_terminated():
2977                     if not build_work.is_finished():
2978                         return False
2979                     else:
2980                         pass
2981                 else:
2982                     if build_work.is_suspended():
2983                         return True
2984                     else:
2985                         return False
2986             return self.runs[str(self.num_run)].is_suspended(synchronize=False)
2987         return False
2988 
2989     def get_terminated_msg(self):
2990         if self.is_terminated():
2991             return self.runs[str(self.num_run)].get_terminated_msg()
2992         return None
2993 
2994     def get_status(self):
2995         if not self.runs:
2996             return WorkStatus.New
2997         if not self.is_terminated():
2998             return WorkStatus.Transforming
2999         return self.runs[str(self.num_run)].get_status()
3000 
3001     def depend_on(self, work):
3002         return self.template.depend_on(work)
3003 
3004     def add_proxy(self):
3005         self.template.add_proxy()
3006 
3007     def get_proxy(self):
3008         self.template.get_proxy()
3009 
3010     def add_loop_condition(self, condition, position='end'):
3011         if not position or position != 'begin':
3012             position = 'end'
3013         position = 'end'    # force position to end currently. position = 'begin' is not supported now.
3014         self.template.add_loop_condition(condition, position=position)
3015         self.loop_condition_position = position
3016 
3017     def refresh(self):
3018         self.refresh_works()
3019 
3020     def refresh_works(self, clean=False):
3021         if self.runs:
3022             self.runs[str(self.num_run)].refresh_works(clean=clean)
3023 
3024     def sync_works(self, to_cancel=False):
3025         if to_cancel:
3026             self.to_cancel = to_cancel
3027 
3028         build_work = self.get_build_work()
3029         if build_work:
3030             if not build_work.is_terminated():
3031                 return
3032             elif build_work.is_terminated() and not build_work.is_finished():
3033                 return
3034 
3035         self.refresh_works()
3036         # position is end.
3037         if self.num_run < 1:
3038             self.num_run = 1
3039         if str(self.num_run) not in self.runs:
3040             self.template.parent_num_run = self.parent_num_run
3041             self.runs[str(self.num_run)] = self.template.copy()
3042             if self.runs[str(self.num_run)].has_loop_condition():
3043                 self.runs[str(self.num_run)].num_run = self.num_run
3044 
3045             # self.runs[str(self.num_run)].parent_num_run = self.get_combined_num_run()
3046             if self.runs[str(self.num_run)].has_loop_condition():
3047                 if self.num_run > 1:
3048                     p_metadata = self.runs[str(self.num_run - 1)].get_metadata_item('parameter_links')
3049                     self.runs[str(self.num_run)].add_metadata_item('parameter_links', p_metadata)
3050 
3051         self.runs[str(self.num_run)].sync_works(to_cancel=to_cancel)
3052 
3053         if self.runs[str(self.num_run)].is_terminated(synchronize=False, new=True):
3054             self.logger.info("%s num_run %s is_terminated" % (self.get_internal_id(), self.num_run))
3055             if to_cancel:
3056                 self.logger.info("num_run %s, to cancel" % self.num_run)
3057             else:
3058                 if self.runs[str(self.num_run)].has_loop_condition():
3059                     if self.runs[str(self.num_run)].get_loop_condition_status():
3060                         self.logger.info("%s num_run %s get_loop_condition_status %s, start next run" % (self.get_internal_id(), self.num_run, self.runs[str(self.num_run)].get_loop_condition_status()))
3061                         self._num_run += 1
3062                         self.logger.info("new num_run is %s" % (self.num_run))
3063                         self.template.parent_num_run = self.parent_num_run
3064                         self.runs[str(self.num_run)] = self.template.copy()
3065 
3066                         if self.runs[str(self.num_run)].has_loop_condition():
3067                             self.runs[str(self.num_run)].num_run = self.num_run
3068 
3069                         p_metadata = self.runs[str(self.num_run - 1)].get_metadata_item('parameter_links')
3070                         self.runs[str(self.num_run)].add_metadata_item('parameter_links', p_metadata)
3071 
3072                         self.runs[str(self.num_run)].global_parameters = self.runs[str(self.num_run - 1)].global_parameters
3073                         self.logger.info("%s new num_run is %s" % (self.get_internal_id(), self.num_run))
3074                     else:
3075                         self.logger.info("%s num_run %s get_loop_condition_status %s, terminated loop" % (self.get_internal_id(), self.num_run, self.runs[str(self.num_run)].get_loop_condition_status()))
3076         self.refresh_works()
3077 
3078     def get_relation_map(self):
3079         if not self.runs:
3080             return []
3081         if self.template.has_loop_condition():
3082             rets = {}
3083             for run in self.runs:
3084                 rets[run] = self.runs[run].get_relation_map()
3085             return [rets]
3086         else:
3087             return self.runs[str(self.num_run)].get_relation_map()
3088 
3089 
3090 class SubWorkflow(Workflow):
3091     def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
3092         # Init a workflow.
3093         super(SubWorkflow, self).__init__(name=name, workload_id=workload_id, lifetime=lifetime, pending_time=pending_time, logger=logger)
3094 
3095 
3096 class LoopWorkflow(Workflow):
3097     def __init__(self, name=None, workload_id=None, lifetime=None, pending_time=None, logger=None):
3098         # Init a workflow.
3099         super(LoopWorkflow, self).__init__(name=name, workload_id=workload_id, lifetime=lifetime, pending_time=pending_time, logger=logger)