Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2021 - 2022
0010 
0011 
0012 """
0013 Test workflow condtions.
0014 """
0015 
0016 import unittest2 as unittest
0017 
0018 # from nose.tools import assert_equal
0019 from idds.common.utils import setup_logging
0020 
0021 from idds.common.utils import json_dumps, json_loads
0022 
0023 from idds.workflowv2.work import Work, WorkStatus
0024 from idds.workflowv2.workflow import (
0025     CompositeCondition,
0026     AndCondition,
0027     OrCondition,
0028     Condition,
0029     ConditionTrigger,
0030     Workflow,
0031     ParameterLink,
0032 )
0033 
0034 
0035 setup_logging(__name__)
0036 
0037 
0038 class TestWorkflowCondtion(unittest.TestCase):
0039 
0040     def test_work_custom_condition(self):
0041         # init_p = Parameter({'input_dataset': 'data17:data17.test.raw.1'})
0042         work1 = Work(
0043             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0044         )
0045         work1.add_custom_condition("to_exit", True)
0046         assert work1.get_custom_condition_status() is False
0047         work1.to_exit = False
0048         assert work1.get_custom_condition_status() is False
0049         work1.to_exit = "False"
0050         assert work1.get_custom_condition_status() is False
0051         work1.to_exit = True
0052         assert work1.get_custom_condition_status() is True
0053 
0054         work1 = Work(
0055             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0056         )
0057         work1.add_custom_condition("to_exit", "true")
0058         assert work1.get_custom_condition_status() is False
0059         work1.to_exit = False
0060         assert work1.get_custom_condition_status() is False
0061         work1.to_exit = "False"
0062         assert work1.get_custom_condition_status() is False
0063         work1.to_exit = True
0064         assert work1.get_custom_condition_status() is False
0065         work1.to_exit = "true"
0066         assert work1.get_custom_condition_status() is True
0067 
0068         work1 = Work(
0069             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0070         )
0071         work1.add_custom_condition("to_exit1", True)
0072         work1.add_custom_condition("to_exit2", True)
0073         assert work1.get_custom_condition_status() is False
0074         work1.to_exit1 = False
0075         assert work1.get_custom_condition_status() is False
0076         work1.to_exit1 = "False"
0077         assert work1.get_custom_condition_status() is False
0078         work1.to_exit1 = True
0079         assert work1.get_custom_condition_status() is False
0080         work1.to_exit2 = "true"
0081         assert work1.get_custom_condition_status() is False
0082         work1.to_exit2 = True
0083         assert work1.get_custom_condition_status() is True
0084 
0085         work1 = Work(
0086             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0087         )
0088         work1.add_custom_condition("to_exit1", True, op="or")
0089         work1.add_custom_condition("to_exit2", True, op="or")
0090         assert work1.get_custom_condition_status() is False
0091         work1.to_exit1 = False
0092         assert work1.get_custom_condition_status() is False
0093         work1.to_exit1 = "False"
0094         assert work1.get_custom_condition_status() is False
0095         work1.to_exit1 = True
0096         assert work1.get_custom_condition_status() is True
0097         work1.to_exit1 = False
0098         work1.to_exit2 = "true"
0099         assert work1.get_custom_condition_status() is False
0100         work1.to_exit2 = True
0101         assert work1.get_custom_condition_status() is True
0102 
0103         work1 = Work(
0104             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0105         )
0106         # to_exit or (to_exit1 or to_exit2)
0107         work1.add_custom_condition("to_exit", True, op="and")
0108         work1.add_custom_condition("to_exit1", True, op="or")
0109         work1.add_custom_condition("to_exit2", True, op="or")
0110         assert work1.get_custom_condition_status() is False
0111         work1.to_exit1 = False
0112         assert work1.get_custom_condition_status() is False
0113         work1.to_exit1 = "False"
0114         assert work1.get_custom_condition_status() is False
0115         work1.to_exit1 = True
0116         assert work1.get_custom_condition_status() is True
0117         work1.to_exit1 = False
0118         work1.to_exit2 = "true"
0119         assert work1.get_custom_condition_status() is False
0120         work1.to_exit2 = True
0121         assert work1.get_custom_condition_status() is True
0122         work1.to_exit1 = False
0123         work1.to_exit2 = False
0124         work1.to_exit = True
0125         assert work1.get_custom_condition_status() is True
0126         assert work1.get_not_custom_condition_status() is False
0127 
0128     def test_condition(self):
0129         # init_p = Parameter({'input_dataset': 'data17:data17.test.raw.1'})
0130         work1 = Work(
0131             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0132         )
0133         work2 = Work(
0134             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0135         )
0136         work3 = Work(
0137             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0138         )
0139         work4 = Work(
0140             executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0141         )
0142         work5 = Work(
0143             executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0144         )
0145         work6 = Work(
0146             executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0147         )
0148         work7 = Work(
0149             executable="echo",
0150             arguments="--in=IN_DATASET --out=OUT_DATASET",
0151             sandbox=None,
0152             work_id=7,
0153             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0154             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0155         )
0156         work8 = Work(
0157             executable="echo",
0158             arguments="--in=IN_DATASET --out=OUT_DATASET",
0159             sandbox=None,
0160             work_id=8,
0161             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0162             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0163         )
0164 
0165         workflow = Workflow()
0166         workflow.add_work(work1, initial=True)
0167         workflow.add_work(work2, initial=True)
0168         workflow.add_work(work3, initial=False)
0169         workflow.add_work(work8, initial=False)
0170 
0171         # CompositeCondition
0172         cond1 = CompositeCondition(
0173             conditions=work1.is_finished, true_works=work2, false_works=work3
0174         )
0175         works = cond1.all_works()
0176         assert works == [work1, work2, work3]
0177         works = cond1.all_pre_works()
0178         assert works == [work1]
0179         works = cond1.all_next_works()
0180         assert works == [work2, work3]
0181         cond_status = cond1.get_condition_status()
0182         assert cond_status is False
0183 
0184         work1.status = WorkStatus.Finished
0185         cond_status = cond1.get_condition_status()
0186         assert cond_status is True
0187         work1.status = WorkStatus.New
0188 
0189         works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0190         assert works == [work3]
0191         work1.status = WorkStatus.Finished
0192         works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0193         assert works == [work2]
0194         work1.status = WorkStatus.New
0195 
0196         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0197         assert works == [work3]
0198         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0199         assert works == []
0200         work1.status = WorkStatus.Finished
0201         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0202         assert works == [work2]
0203         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0204         assert works == []
0205         work1.status = WorkStatus.New
0206 
0207         works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0208         assert works == [work3]
0209         work1.status = WorkStatus.Finished
0210         works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0211         assert works == [work2]
0212         work1.status = WorkStatus.New
0213 
0214         # CompositeCondition
0215         cond2 = CompositeCondition(
0216             conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0217             true_works=[work4, work5],
0218             false_works=[work6, work7],
0219         )
0220 
0221         works = cond2.all_works()
0222         assert works == [work1, work2, work3, work4, work5, work6, work7]
0223         works = cond2.all_pre_works()
0224         assert works == [work1, work2, work3]
0225         works = cond2.all_next_works()
0226         assert works == [work4, work5, work6, work7]
0227         cond_status = cond2.get_condition_status()
0228         assert cond_status is False
0229 
0230         work1.status = WorkStatus.Finished
0231         cond_status = cond2.get_condition_status()
0232         assert cond_status is False
0233         work2.status = WorkStatus.Finished
0234         cond_status = cond2.get_condition_status()
0235         assert cond_status is False
0236         work3.status = WorkStatus.Finished
0237         cond_status = cond2.get_condition_status()
0238         assert cond_status is True
0239         work1.status = WorkStatus.New
0240         work2.status = WorkStatus.New
0241         work3.status = WorkStatus.New
0242 
0243         works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0244         assert works == [work6, work7]
0245         work1.status = WorkStatus.Finished
0246         work2.status = WorkStatus.Finished
0247         work3.status = WorkStatus.Finished
0248         works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0249         assert works == [work4, work5]
0250         work1.status = WorkStatus.New
0251         work2.status = WorkStatus.New
0252         work3.status = WorkStatus.New
0253 
0254         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0255         assert works == [work6, work7]
0256         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0257         assert works == []
0258         work1.status = WorkStatus.Finished
0259         work2.status = WorkStatus.Finished
0260         work3.status = WorkStatus.Finished
0261         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0262         assert works == [work4, work5]
0263         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0264         assert works == []
0265         work1.status = WorkStatus.New
0266         work2.status = WorkStatus.New
0267         work3.status = WorkStatus.New
0268 
0269         works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0270         assert works == [work6, work7]
0271         work1.status = WorkStatus.Finished
0272         work2.status = WorkStatus.Finished
0273         work3.status = WorkStatus.Finished
0274         works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0275         assert works == [work4, work5]
0276         work1.status = WorkStatus.New
0277         work2.status = WorkStatus.New
0278         work3.status = WorkStatus.New
0279 
0280         # AndCondition
0281         cond3 = AndCondition(
0282             conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0283             true_works=[work4, work5],
0284             false_works=[work6, work7],
0285         )
0286 
0287         works = cond3.all_works()
0288         assert works == [work1, work2, work3, work4, work5, work6, work7]
0289         works = cond3.all_pre_works()
0290         assert works == [work1, work2, work3]
0291         works = cond3.all_next_works()
0292         assert works == [work4, work5, work6, work7]
0293         cond_status = cond3.get_condition_status()
0294         assert cond_status is False
0295 
0296         work1.status = WorkStatus.Finished
0297         cond_status = cond3.get_condition_status()
0298         assert cond_status is False
0299         work2.status = WorkStatus.Finished
0300         cond_status = cond3.get_condition_status()
0301         assert cond_status is False
0302         work3.status = WorkStatus.Finished
0303         cond_status = cond3.get_condition_status()
0304         assert cond_status is True
0305         work1.status = WorkStatus.New
0306         work2.status = WorkStatus.New
0307         work3.status = WorkStatus.New
0308 
0309         works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0310         assert works == [work6, work7]
0311         work1.status = WorkStatus.Finished
0312         work2.status = WorkStatus.Finished
0313         work3.status = WorkStatus.Finished
0314         works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0315         assert works == [work4, work5]
0316         work1.status = WorkStatus.New
0317         work2.status = WorkStatus.New
0318         work3.status = WorkStatus.New
0319 
0320         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0321         assert works == [work6, work7]
0322         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0323         assert works == []
0324         work1.status = WorkStatus.Finished
0325         work2.status = WorkStatus.Finished
0326         work3.status = WorkStatus.Finished
0327         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0328         assert works == [work4, work5]
0329         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0330         assert works == []
0331         work1.status = WorkStatus.New
0332         work2.status = WorkStatus.New
0333         work3.status = WorkStatus.New
0334 
0335         works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0336         assert works == [work6, work7]
0337         work1.status = WorkStatus.Finished
0338         work2.status = WorkStatus.Finished
0339         work3.status = WorkStatus.Finished
0340         works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0341         assert works == [work4, work5]
0342         work1.status = WorkStatus.New
0343         work2.status = WorkStatus.New
0344         work3.status = WorkStatus.New
0345 
0346         # OrCondtion
0347         cond4 = OrCondition(
0348             conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0349             true_works=[work4, work5],
0350             false_works=[work6, work7],
0351         )
0352 
0353         works = cond4.all_works()
0354         assert works == [work1, work2, work3, work4, work5, work6, work7]
0355         works = cond4.all_pre_works()
0356         assert works == [work1, work2, work3]
0357         works = cond4.all_next_works()
0358         assert works == [work4, work5, work6, work7]
0359         cond_status = cond4.get_condition_status()
0360         assert cond_status is False
0361 
0362         work1.status = WorkStatus.Finished
0363         cond_status = cond4.get_condition_status()
0364         assert cond_status is True
0365         work1.status = WorkStatus.New
0366         work2.status = WorkStatus.Finished
0367         cond_status = cond4.get_condition_status()
0368         assert cond_status is True
0369         work2.status = WorkStatus.New
0370         work3.status = WorkStatus.Finished
0371         cond_status = cond4.get_condition_status()
0372         assert cond_status is True
0373         work1.status = WorkStatus.New
0374         work2.status = WorkStatus.New
0375         work3.status = WorkStatus.New
0376 
0377         works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0378         assert works == [work6, work7]
0379         work1.status = WorkStatus.Finished
0380         # work2.status = WorkStatus.Finished
0381         # work3.status = WorkStatus.Finished
0382         works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0383         assert works == [work4, work5]
0384         work1.status = WorkStatus.New
0385         work2.status = WorkStatus.New
0386         work3.status = WorkStatus.New
0387 
0388         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0389         assert works == [work6, work7]
0390         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0391         assert works == []
0392         work1.status = WorkStatus.Finished
0393         # work2.status = WorkStatus.Finished
0394         # work3.status = WorkStatus.Finished
0395         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0396         assert works == [work4, work5]
0397         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0398         assert works == []
0399         work1.status = WorkStatus.New
0400         work2.status = WorkStatus.New
0401         work3.status = WorkStatus.New
0402 
0403         works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0404         assert works == [work6, work7]
0405         work1.status = WorkStatus.Finished
0406         # work2.status = WorkStatus.Finished
0407         # work3.status = WorkStatus.Finished
0408         works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0409         assert works == [work4, work5]
0410         work1.status = WorkStatus.New
0411         work2.status = WorkStatus.New
0412         work3.status = WorkStatus.New
0413 
0414         # Condition
0415         cond5 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0416 
0417         works = cond5.all_works()
0418         assert works == [work1, work2, work3]
0419         works = cond5.all_pre_works()
0420         assert works == [work1]
0421         works = cond5.all_next_works()
0422         assert works == [work2, work3]
0423         cond_status = cond5.get_condition_status()
0424         assert cond_status is False
0425 
0426         work1.status = WorkStatus.Finished
0427         cond_status = cond5.get_condition_status()
0428         assert cond_status is True
0429         work1.status = WorkStatus.New
0430 
0431         works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0432         assert works == [work3]
0433         work1.status = WorkStatus.Finished
0434         works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0435         assert works == [work2]
0436         work1.status = WorkStatus.New
0437 
0438         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0439         assert works == [work3]
0440         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0441         assert works == []
0442         work1.status = WorkStatus.Finished
0443         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0444         assert works == [work2]
0445         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0446         assert works == []
0447         work1.status = WorkStatus.New
0448 
0449         works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0450         assert works == [work3]
0451         work1.status = WorkStatus.Finished
0452         works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0453         assert works == [work2]
0454         work1.status = WorkStatus.New
0455 
0456         # multiple conditions
0457         cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0458         cond7 = CompositeCondition(
0459             conditions=[work4.is_finished, work5.is_finished],
0460             true_works=[work6, cond6],
0461             false_works=work7,
0462         )
0463 
0464         works = cond7.all_works()
0465         works.sort(key=lambda x: x.work_id)
0466         assert works == [work1, work2, work3, work4, work5, work6, work7]
0467         works = cond7.all_pre_works()
0468         works.sort(key=lambda x: x.work_id)
0469         assert works == [work1, work4, work5]
0470         works = cond7.all_next_works()
0471         works.sort(key=lambda x: x.work_id)
0472         # print([w.work_id for w in works])
0473         assert works == [work2, work3, work6, work7]
0474         cond_status = cond7.get_condition_status()
0475         assert cond_status is False
0476 
0477         work4.status = WorkStatus.Finished
0478         cond_status = cond7.get_condition_status()
0479         assert cond_status is False
0480         work5.status = WorkStatus.Finished
0481         cond_status = cond7.get_condition_status()
0482         assert cond_status is True
0483         work4.status = WorkStatus.New
0484         work5.status = WorkStatus.New
0485 
0486         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0487         assert works == [work7]
0488         work4.status = WorkStatus.Finished
0489         work5.status = WorkStatus.Finished
0490         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0491         works.sort(key=lambda x: x.work_id)
0492         assert works == [work3, work6]
0493         work1.status = WorkStatus.Finished
0494         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0495         works.sort(key=lambda x: x.work_id)
0496         assert works == [work2, work6]
0497         work4.status = WorkStatus.New
0498         work5.status = WorkStatus.New
0499         work1.status = WorkStatus.New
0500 
0501         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0502         assert works == [work7]
0503         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0504         assert works == []
0505         work4.status = WorkStatus.Finished
0506         work5.status = WorkStatus.Finished
0507         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0508         works.sort(key=lambda x: x.work_id)
0509         assert works == [work3, work6]
0510         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0511         assert works == []
0512         work1.status = WorkStatus.Finished
0513         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0514         works.sort(key=lambda x: x.work_id)
0515         assert works == [work2]
0516         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0517         works.sort(key=lambda x: x.work_id)
0518         assert works == []
0519         work4.status = WorkStatus.New
0520         work5.status = WorkStatus.New
0521         work1.status = WorkStatus.New
0522 
0523         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0524         assert works == [work7]
0525         work4.status = WorkStatus.Finished
0526         work5.status = WorkStatus.Finished
0527         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0528         works.sort(key=lambda x: x.work_id)
0529         assert works == [work3, work6]
0530         work1.status = WorkStatus.Finished
0531         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0532         works.sort(key=lambda x: x.work_id)
0533         assert works == [work2, work6]
0534         work4.status = WorkStatus.New
0535         work5.status = WorkStatus.New
0536         work1.status = WorkStatus.New
0537 
0538         # multiple conditions
0539         # cond8 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0540         cond8 = Condition(cond=work1.is_finished)
0541         cond9 = CompositeCondition(
0542             conditions=[work4.is_finished, cond8.is_condition_true],
0543             true_works=[work6],
0544             false_works=work7,
0545         )
0546 
0547         works = cond9.all_works()
0548         works.sort(key=lambda x: x.work_id)
0549         assert works == [work1, work4, work6, work7]
0550         works = cond9.all_pre_works()
0551         works.sort(key=lambda x: x.work_id)
0552         assert works == [work1, work4]
0553         works = cond9.all_next_works()
0554         works.sort(key=lambda x: x.work_id)
0555         # print([w.work_id for w in works])
0556         assert works == [work6, work7]
0557         cond_status = cond9.get_condition_status()
0558         assert cond_status is False
0559 
0560         work4.status = WorkStatus.Finished
0561         cond_status = cond9.get_condition_status()
0562         assert cond_status is False
0563         work1.status = WorkStatus.Finished
0564         cond_status = cond9.get_condition_status()
0565         assert cond_status is True
0566         work4.status = WorkStatus.New
0567         work1.status = WorkStatus.New
0568 
0569         works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0570         assert works == [work7]
0571         work4.status = WorkStatus.Finished
0572         work1.status = WorkStatus.Finished
0573         works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0574         works.sort(key=lambda x: x.work_id)
0575         assert works == [work6]
0576         work1.status = WorkStatus.Finished
0577         works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0578         works.sort(key=lambda x: x.work_id)
0579         assert works == [work6]
0580         work4.status = WorkStatus.New
0581         work1.status = WorkStatus.New
0582 
0583         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0584         assert works == [work7]
0585         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0586         assert works == []
0587         work4.status = WorkStatus.Finished
0588         work1.status = WorkStatus.Finished
0589         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0590         works.sort(key=lambda x: x.work_id)
0591         assert works == [work6]
0592         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0593         assert works == []
0594         work1.status = WorkStatus.Finished
0595         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0596         works.sort(key=lambda x: x.work_id)
0597         assert works == []
0598         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0599         works.sort(key=lambda x: x.work_id)
0600         assert works == []
0601         work4.status = WorkStatus.New
0602         work1.status = WorkStatus.New
0603 
0604         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0605         assert works == [work7]
0606         work4.status = WorkStatus.Finished
0607         work1.status = WorkStatus.Finished
0608         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0609         works.sort(key=lambda x: x.work_id)
0610         assert works == [work6]
0611         work1.status = WorkStatus.Finished
0612         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0613         works.sort(key=lambda x: x.work_id)
0614         assert works == [work6]
0615         work4.status = WorkStatus.New
0616         work1.status = WorkStatus.New
0617 
0618         return workflow
0619 
0620     def print_workflow(self, workflow):
0621         print("print workflow")
0622         print(workflow.conditions)
0623         for cond_id in workflow.conditions:
0624             print(cond_id)
0625             cond = workflow.conditions[cond_id]
0626             print(cond)
0627             print(cond.conditions)
0628             print(cond.true_works)
0629             print(cond.false_works)
0630             for w in cond.true_works:
0631                 print(w)
0632                 if isinstance(w, CompositeCondition):
0633                     print(w.conditions)
0634                     print(w.true_works)
0635                     print(w.false_works)
0636 
0637     def test_workflow(self):
0638         work1 = Work(
0639             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0640         )
0641         work2 = Work(
0642             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0643         )
0644         work3 = Work(
0645             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0646         )
0647         work4 = Work(
0648             executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0649         )
0650         work5 = Work(
0651             executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0652         )
0653         work6 = Work(
0654             executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0655         )
0656         work7 = Work(
0657             executable="echo",
0658             arguments="--in=IN_DATASET --out=OUT_DATASET",
0659             sandbox=None,
0660             work_id=7,
0661             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0662             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0663         )
0664         work8 = Work(
0665             executable="echo",
0666             arguments="--in=IN_DATASET --out=OUT_DATASET",
0667             sandbox=None,
0668             work_id=8,
0669             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0670             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0671         )
0672 
0673         workflow = Workflow()
0674         workflow.add_work(work1, initial=False)
0675         workflow.add_work(work2, initial=False)
0676         workflow.add_work(work3, initial=False)
0677         workflow.add_work(work4, initial=False)
0678         workflow.add_work(work5, initial=False)
0679         workflow.add_work(work6, initial=False)
0680         workflow.add_work(work7, initial=False)
0681         workflow.add_work(work8, initial=False)
0682 
0683         # multiple conditions
0684         cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0685         cond7 = CompositeCondition(
0686             conditions=[work4.is_finished, work5.is_finished],
0687             true_works=[work6, cond6],
0688             false_works=work7,
0689         )
0690 
0691         # multiple conditions
0692         # cond8 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0693         cond8 = Condition(cond=work1.is_finished)
0694         cond9 = CompositeCondition(
0695             conditions=[work4.is_finished, cond8.is_condition_true],
0696             true_works=[work6],
0697             false_works=work7,
0698         )
0699 
0700         workflow.add_condition(cond7)
0701         workflow.add_condition(cond9)
0702         id_works = workflow.independent_works
0703         # print(id_works)
0704         id_works.sort()
0705         id_works_1 = [work1, work4, work5, work8]
0706         id_works_1 = [w.get_template_id() for w in id_works_1]
0707         id_works_1.sort()
0708         # id_works.sort(key=lambda x: x.work_id)
0709         assert id_works == id_works_1
0710 
0711         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0712         # print(workflow_str)
0713         workflow1 = json_loads(workflow_str)
0714         # print('before load_metadata')
0715         # self.print_workflow(workflow1)
0716         workflow1.load_metadata()
0717         # print('after load_metadata')
0718         # self.print_workflow(workflow1)
0719         workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0720         assert workflow_str == workflow_str1
0721 
0722         works = cond7.all_works()
0723         works.sort(key=lambda x: x.work_id)
0724         assert works == [work1, work2, work3, work4, work5, work6, work7]
0725         works = cond7.all_pre_works()
0726         works.sort(key=lambda x: x.work_id)
0727         assert works == [work1, work4, work5]
0728         works = cond7.all_next_works()
0729         works.sort(key=lambda x: x.work_id)
0730         # print([w.work_id for w in works])
0731         assert works == [work2, work3, work6, work7]
0732         cond_status = cond7.get_condition_status()
0733         assert cond_status is False
0734 
0735         work4.status = WorkStatus.Finished
0736         cond_status = cond7.get_condition_status()
0737         assert cond_status is False
0738         work5.status = WorkStatus.Finished
0739         cond_status = cond7.get_condition_status()
0740         assert cond_status is True
0741         work4.status = WorkStatus.New
0742         work5.status = WorkStatus.New
0743 
0744         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0745         assert works == [work7]
0746         work4.status = WorkStatus.Finished
0747         work5.status = WorkStatus.Finished
0748         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0749         works.sort(key=lambda x: x.work_id)
0750         assert works == [work3, work6]
0751         work1.status = WorkStatus.Finished
0752         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0753         works.sort(key=lambda x: x.work_id)
0754         assert works == [work2, work6]
0755         work4.status = WorkStatus.New
0756         work5.status = WorkStatus.New
0757         work1.status = WorkStatus.New
0758 
0759         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0760         assert works == [work7]
0761         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0762         assert works == []
0763         work4.status = WorkStatus.Finished
0764         work5.status = WorkStatus.Finished
0765         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0766         works.sort(key=lambda x: x.work_id)
0767         assert works == [work3, work6]
0768         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0769         assert works == []
0770         work1.status = WorkStatus.Finished
0771         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0772         works.sort(key=lambda x: x.work_id)
0773         assert works == [work2]
0774         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0775         works.sort(key=lambda x: x.work_id)
0776         assert works == []
0777         work4.status = WorkStatus.New
0778         work5.status = WorkStatus.New
0779         work1.status = WorkStatus.New
0780 
0781         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0782         assert works == [work7]
0783         work4.status = WorkStatus.Finished
0784         work5.status = WorkStatus.Finished
0785         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0786         works.sort(key=lambda x: x.work_id)
0787         assert works == [work3, work6]
0788         work1.status = WorkStatus.Finished
0789         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0790         works.sort(key=lambda x: x.work_id)
0791         assert works == [work2, work6]
0792         work4.status = WorkStatus.New
0793         work5.status = WorkStatus.New
0794         work1.status = WorkStatus.New
0795 
0796         # multiple conditions
0797         # cond8 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0798         # cond8 = Condition(cond=work1.is_finished)
0799         # cond9 = CompositeCondition(conditions=[work4.is_finished, cond8.is_condition_true], true_works=[work6], false_works=work7)
0800 
0801         works = cond9.all_works()
0802         works.sort(key=lambda x: x.work_id)
0803         assert works == [work1, work4, work6, work7]
0804         works = cond9.all_pre_works()
0805         works.sort(key=lambda x: x.work_id)
0806         assert works == [work1, work4]
0807         works = cond9.all_next_works()
0808         works.sort(key=lambda x: x.work_id)
0809         # print([w.work_id for w in works])
0810         assert works == [work6, work7]
0811         cond_status = cond9.get_condition_status()
0812         assert cond_status is False
0813 
0814         work4.status = WorkStatus.Finished
0815         cond_status = cond9.get_condition_status()
0816         assert cond_status is False
0817         work1.status = WorkStatus.Finished
0818         cond_status = cond9.get_condition_status()
0819         assert cond_status is True
0820         work4.status = WorkStatus.New
0821         work1.status = WorkStatus.New
0822 
0823         works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0824         assert works == [work7]
0825         work4.status = WorkStatus.Finished
0826         work1.status = WorkStatus.Finished
0827         works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0828         works.sort(key=lambda x: x.work_id)
0829         assert works == [work6]
0830         work1.status = WorkStatus.Finished
0831         works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0832         works.sort(key=lambda x: x.work_id)
0833         assert works == [work6]
0834         work4.status = WorkStatus.New
0835         work1.status = WorkStatus.New
0836 
0837         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0838         assert works == [work7]
0839         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0840         assert works == []
0841         work4.status = WorkStatus.Finished
0842         work1.status = WorkStatus.Finished
0843         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0844         works.sort(key=lambda x: x.work_id)
0845         assert works == [work6]
0846         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0847         assert works == []
0848         work1.status = WorkStatus.Finished
0849         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0850         works.sort(key=lambda x: x.work_id)
0851         assert works == []
0852         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0853         works.sort(key=lambda x: x.work_id)
0854         assert works == []
0855         work4.status = WorkStatus.New
0856         work1.status = WorkStatus.New
0857 
0858         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0859         assert works == [work7]
0860         work4.status = WorkStatus.Finished
0861         work1.status = WorkStatus.Finished
0862         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0863         works.sort(key=lambda x: x.work_id)
0864         assert works == [work6]
0865         work1.status = WorkStatus.Finished
0866         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0867         works.sort(key=lambda x: x.work_id)
0868         assert works == [work6]
0869         work4.status = WorkStatus.New
0870         work1.status = WorkStatus.New
0871 
0872         return workflow
0873 
0874     def test_workflow_condition_reload(self):
0875         work1 = Work(
0876             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0877         )
0878         work2 = Work(
0879             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0880         )
0881         work3 = Work(
0882             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0883         )
0884         work4 = Work(
0885             executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0886         )
0887         work5 = Work(
0888             executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0889         )
0890         work6 = Work(
0891             executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0892         )
0893         work7 = Work(
0894             executable="echo",
0895             arguments="--in=IN_DATASET --out=OUT_DATASET",
0896             sandbox=None,
0897             work_id=7,
0898             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0899             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0900         )
0901         work8 = Work(
0902             executable="echo",
0903             arguments="--in=IN_DATASET --out=OUT_DATASET",
0904             sandbox=None,
0905             work_id=8,
0906             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0907             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0908         )
0909 
0910         workflow = Workflow()
0911         workflow.add_work(work1, initial=False)
0912         workflow.add_work(work2, initial=False)
0913         workflow.add_work(work3, initial=False)
0914         workflow.add_work(work4, initial=False)
0915         workflow.add_work(work5, initial=False)
0916         workflow.add_work(work6, initial=False)
0917         workflow.add_work(work7, initial=False)
0918         workflow.add_work(work8, initial=False)
0919 
0920         # multiple conditions
0921         cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0922         cond7 = CompositeCondition(
0923             conditions=[work4.is_finished, work5.is_finished],
0924             true_works=[work6, cond6],
0925             false_works=work7,
0926         )
0927 
0928         # multiple conditions
0929         # cond8 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0930         cond8 = Condition(cond=work1.is_finished)
0931         cond9 = CompositeCondition(
0932             conditions=[work4.is_finished, cond8.is_condition_true],
0933             true_works=[work6],
0934             false_works=work7,
0935         )
0936 
0937         workflow.add_condition(cond7)
0938         workflow.add_condition(cond9)
0939 
0940         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0941         # print(workflow_str)
0942         workflow1 = json_loads(workflow_str)
0943         # print('before load_metadata')
0944         # self.print_workflow(workflow1)
0945         workflow1.load_metadata()
0946         # print('after load_metadata')
0947         # self.print_workflow(workflow1)
0948         workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0949         assert workflow_str == workflow_str1
0950 
0951         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0952         assert works == [work7]
0953         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0954         assert works == []
0955         work4.status = WorkStatus.Finished
0956         work5.status = WorkStatus.Finished
0957         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0958         works.sort(key=lambda x: x.work_id)
0959         assert works == [work3, work6]
0960         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0961         assert works == []
0962         work1.status = WorkStatus.Finished
0963         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0964         works.sort(key=lambda x: x.work_id)
0965         assert works == [work2]
0966         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0967         works.sort(key=lambda x: x.work_id)
0968         assert works == []
0969         work4.status = WorkStatus.New
0970         work5.status = WorkStatus.New
0971         work1.status = WorkStatus.New
0972 
0973         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0974         assert works == [work7]
0975         work4.status = WorkStatus.Finished
0976         work5.status = WorkStatus.Finished
0977         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0978         works.sort(key=lambda x: x.work_id)
0979         assert works == [work3, work6]
0980         work1.status = WorkStatus.Finished
0981         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0982         works.sort(key=lambda x: x.work_id)
0983         assert works == [work2, work6]
0984         work4.status = WorkStatus.New
0985         work5.status = WorkStatus.New
0986         work1.status = WorkStatus.New
0987 
0988         # cond9
0989         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0990         assert works == [work7]
0991         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0992         assert works == []
0993         work4.status = WorkStatus.Finished
0994         work1.status = WorkStatus.Finished
0995         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0996         works.sort(key=lambda x: x.work_id)
0997         assert works == [work6]
0998         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0999         assert works == []
1000         work1.status = WorkStatus.Finished
1001         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
1002         works.sort(key=lambda x: x.work_id)
1003         assert works == []
1004         works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
1005         works.sort(key=lambda x: x.work_id)
1006         assert works == []
1007         work4.status = WorkStatus.New
1008         work1.status = WorkStatus.New
1009 
1010         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
1011         assert works == [work7]
1012         work4.status = WorkStatus.Finished
1013         work1.status = WorkStatus.Finished
1014         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
1015         works.sort(key=lambda x: x.work_id)
1016         assert works == [work6]
1017         work1.status = WorkStatus.Finished
1018         works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
1019         works.sort(key=lambda x: x.work_id)
1020         assert works == [work6]
1021         work4.status = WorkStatus.New
1022         work1.status = WorkStatus.New
1023 
1024         return workflow
1025 
1026     def test_workflow_loop(self):
1027         work1 = Work(
1028             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1029         )
1030         work2 = Work(
1031             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1032         )
1033 
1034         workflow = Workflow()
1035         workflow.add_work(work1, initial=False)
1036         workflow.add_work(work2, initial=False)
1037 
1038         cond = Condition(cond=work2.is_finished)
1039         workflow.add_loop_condition(cond)
1040 
1041         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1042         # print(workflow_str)
1043         workflow1 = json_loads(workflow_str)
1044         # print('before load_metadata')
1045         # self.print_workflow(workflow1)
1046         workflow1.load_metadata()
1047         # print('after load_metadata')
1048         # self.print_workflow(workflow1)
1049         workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
1050         assert workflow_str == workflow_str1
1051 
1052     def test_workflow_loop1(self):
1053         work1 = Work(
1054             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1055         )
1056         work2 = Work(
1057             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1058         )
1059 
1060         workflow = Workflow()
1061         workflow.add_work(work1, initial=False)
1062         workflow.add_work(work2, initial=False)
1063 
1064         cond = Condition(cond=work2.is_finished)
1065         workflow.add_loop_condition(cond)
1066 
1067         works = workflow.get_new_works()
1068         works.sort(key=lambda x: x.work_id)
1069         assert works == [work1, work2]
1070         assert workflow.num_run == 1
1071 
1072         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1073         # print(workflow_str)
1074         return workflow
1075 
1076     def test_workflow_loop2(self):
1077         work1 = Work(
1078             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1079         )
1080         work2 = Work(
1081             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1082         )
1083 
1084         workflow = Workflow()
1085         workflow.add_work(work1, initial=False)
1086         workflow.add_work(work2, initial=False)
1087 
1088         cond = Condition(cond=work2.is_finished)
1089         workflow.add_loop_condition(cond)
1090 
1091         works = workflow.get_new_works()
1092         works.sort(key=lambda x: x.work_id)
1093         assert works == [work1, work2]
1094         assert workflow.num_run == 1
1095 
1096         for work in works:
1097             work.transforming = True
1098             work.status = WorkStatus.Finished
1099         works = workflow.get_new_works()
1100         works.sort(key=lambda x: x.work_id)
1101         assert works == [work1, work2]
1102         assert workflow.num_run == 2
1103         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1104         # print(workflow_str)
1105 
1106         for work in works:
1107             work.transforming = True
1108             work.status = WorkStatus.Finished
1109         works = workflow.get_new_works()
1110         works.sort(key=lambda x: x.work_id)
1111         assert works == [work1, work2]
1112         assert workflow.num_run == 3
1113         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1114         # print(workflow_str)
1115 
1116         for work in works:
1117             work.transforming = True
1118             work.status = WorkStatus.Failed
1119         works = workflow.get_new_works()
1120         works.sort(key=lambda x: x.work_id)
1121         assert works == []
1122         assert workflow.num_run == 3
1123         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1124         # print(workflow_str)
1125 
1126         return workflow
1127 
1128     def test_workflow_subworkflow(self):
1129         work1 = Work(
1130             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1131         )
1132         work2 = Work(
1133             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1134         )
1135 
1136         workflow1 = Workflow()
1137         workflow1.add_work(work1, initial=False)
1138         workflow1.add_work(work2, initial=False)
1139 
1140         work3 = Work(
1141             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1142         )
1143 
1144         workflow = Workflow()
1145         workflow.add_work(work3, initial=False)
1146         workflow.add_work(workflow1, initial=False)
1147 
1148         works = workflow.get_new_works()
1149         # print(json_dumps(workflow, sort_keys=True, indent=4))
1150         # print(json_dumps(works, sort_keys=True, indent=4))
1151         works.sort(key=lambda x: x.work_id)
1152         assert works == [work1, work2, work3]
1153         # assert(workflow1.num_run == 1)
1154 
1155         for work in works:
1156             # if work.work_id == 3:
1157             work.transforming = True
1158             work.status = WorkStatus.Failed
1159         works = workflow.get_new_works()
1160         works.sort(key=lambda x: x.work_id)
1161         assert works == []
1162         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1163         # print(workflow_str)
1164         assert workflow.is_terminated() is True
1165 
1166     def test_workflow_subworkflow1(self):
1167         work1 = Work(
1168             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1169         )
1170         work2 = Work(
1171             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1172         )
1173 
1174         workflow1 = Workflow()
1175         workflow1.add_work(work1, initial=False)
1176         workflow1.add_work(work2, initial=False)
1177 
1178         work3 = Work(
1179             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1180         )
1181         cond = Condition(cond=work3.is_finished, true_work=workflow1)
1182 
1183         workflow = Workflow()
1184         workflow.add_work(work3, initial=False)
1185         workflow.add_work(workflow1, initial=False)
1186         workflow.add_condition(cond)
1187 
1188         works = workflow.get_new_works()
1189         works.sort(key=lambda x: x.work_id)
1190         assert works == [work3]
1191         # assert(workflow.num_run == 1)
1192 
1193         for work in works:
1194             # if work.work_id == 3:
1195             work.transforming = True
1196             work.status = WorkStatus.Failed
1197 
1198         works = workflow.get_new_works()
1199         works.sort(key=lambda x: x.work_id)
1200         assert works == []
1201         assert workflow.is_terminated() is True
1202 
1203     def test_workflow_subworkflow2(self):
1204         work1 = Work(
1205             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1206         )
1207         work2 = Work(
1208             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1209         )
1210 
1211         workflow1 = Workflow()
1212         workflow1.add_work(work1, initial=False)
1213         workflow1.add_work(work2, initial=False)
1214 
1215         work3 = Work(
1216             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1217         )
1218         cond = Condition(cond=work3.is_finished, true_work=workflow1)
1219 
1220         workflow = Workflow()
1221         workflow.add_work(work3, initial=False)
1222         workflow.add_work(workflow1, initial=False)
1223         workflow.add_condition(cond)
1224 
1225         works = workflow.get_new_works()
1226         works.sort(key=lambda x: x.work_id)
1227         assert works == [work3]
1228         # assert(workflow.num_run == 1)
1229 
1230         for work in works:
1231             # if work.work_id == 3:
1232             work.transforming = True
1233             work.status = WorkStatus.Finished
1234 
1235         works = workflow.get_new_works()
1236         works.sort(key=lambda x: x.work_id)
1237         assert works == [work1, work2]
1238         assert workflow.is_terminated() is False
1239 
1240         for work in works:
1241             work.transforming = True
1242             work.status = WorkStatus.Finished
1243 
1244         works = workflow.get_new_works()
1245         works.sort(key=lambda x: x.work_id)
1246         assert works == []
1247         assert workflow.is_terminated() is True
1248 
1249     def test_workflow_subloopworkflow(self):
1250         work1 = Work(
1251             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1252         )
1253         work2 = Work(
1254             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1255         )
1256 
1257         workflow1 = Workflow()
1258         workflow1.add_work(work1, initial=False)
1259         workflow1.add_work(work2, initial=False)
1260 
1261         cond = Condition(cond=work2.is_finished)
1262         workflow1.add_loop_condition(cond)
1263 
1264         work3 = Work(
1265             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1266         )
1267 
1268         workflow = Workflow()
1269         workflow.add_work(work3, initial=False)
1270         workflow.add_work(workflow1, initial=False)
1271 
1272         works = workflow.get_new_works()
1273         works.sort(key=lambda x: x.work_id)
1274         assert works == [work1, work2, work3]
1275         # assert(workflow1.num_run == 1)
1276 
1277         for work in works:
1278             # if work.work_id == 3:
1279             work.transforming = True
1280             work.status = WorkStatus.Failed
1281         assert workflow.is_terminated() is True
1282 
1283     def test_workflow_subloopworkflow1(self):
1284         work1 = Work(
1285             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1286         )
1287         work2 = Work(
1288             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1289         )
1290 
1291         workflow1 = Workflow()
1292         workflow1.add_work(work1, initial=False)
1293         workflow1.add_work(work2, initial=False)
1294 
1295         cond = Condition(cond=work2.is_finished)
1296         workflow1.add_loop_condition(cond)
1297 
1298         work3 = Work(
1299             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1300         )
1301 
1302         workflow = Workflow()
1303         workflow.add_work(work3, initial=False)
1304         workflow.add_work(workflow1, initial=False)
1305 
1306         works = workflow.get_new_works()
1307         works.sort(key=lambda x: x.work_id)
1308         assert works == [work1, work2, work3]
1309         # assert(workflow1.num_run == 1)
1310 
1311         for work in works:
1312             # if work.work_id == 3:
1313             work.transforming = True
1314             work.status = WorkStatus.Finished
1315         assert workflow.is_terminated() is False
1316 
1317         works = workflow.get_new_works()
1318         works.sort(key=lambda x: x.work_id)
1319         assert works == [work1, work2]
1320         # assert(workflow1.num_run == 1)
1321 
1322         for work in works:
1323             # if work.work_id == 3:
1324             work.transforming = True
1325             work.status = WorkStatus.Failed
1326         works = workflow.get_new_works()
1327         works.sort(key=lambda x: x.work_id)
1328         assert works == []
1329         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1330         # print(workflow_str)
1331         assert workflow.is_terminated() is True
1332 
1333     def test_workflow_subloopworkflow2(self):
1334         work1 = Work(
1335             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1336         )
1337         work2 = Work(
1338             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1339         )
1340 
1341         workflow1 = Workflow()
1342         workflow1.add_work(work1, initial=False)
1343         workflow1.add_work(work2, initial=False)
1344 
1345         cond = Condition(cond=work2.is_finished)
1346         workflow1.add_loop_condition(cond)
1347 
1348         work3 = Work(
1349             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1350         )
1351         cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1352 
1353         workflow = Workflow()
1354         workflow.add_work(work3, initial=False)
1355         workflow.add_work(workflow1, initial=False)
1356         workflow.add_condition(cond1)
1357 
1358         works = workflow.get_new_works()
1359         works.sort(key=lambda x: x.work_id)
1360         assert works == [work3]
1361         # assert(workflow.num_run == 1)
1362 
1363         for work in works:
1364             # if work.work_id == 3:
1365             work.transforming = True
1366             work.status = WorkStatus.Failed
1367 
1368         works = workflow.get_new_works()
1369         works.sort(key=lambda x: x.work_id)
1370         assert works == []
1371         assert workflow.is_terminated() is True
1372 
1373     def test_workflow_subloopworkflow3(self):
1374         work1 = Work(
1375             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1376         )
1377         work2 = Work(
1378             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1379         )
1380 
1381         workflow1 = Workflow()
1382         workflow1.add_work(work1, initial=False)
1383         workflow1.add_work(work2, initial=False)
1384 
1385         cond = Condition(cond=work2.is_finished)
1386         workflow1.add_loop_condition(cond)
1387 
1388         work3 = Work(
1389             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1390         )
1391         cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1392 
1393         workflow = Workflow()
1394         workflow.add_work(work3, initial=False)
1395         workflow.add_work(workflow1, initial=False)
1396         workflow.add_condition(cond1)
1397 
1398         works = workflow.get_new_works()
1399         works.sort(key=lambda x: x.work_id)
1400         assert works == [work3]
1401         # assert(workflow.num_run == 1)
1402 
1403         for work in works:
1404             # if work.work_id == 3:
1405             work.transforming = True
1406             work.status = WorkStatus.Finished
1407 
1408         works = workflow.get_new_works()
1409         works.sort(key=lambda x: x.work_id)
1410         assert works == [work1, work2]
1411         assert workflow.is_terminated() is False
1412 
1413         for work in works:
1414             work.transforming = True
1415             work.status = WorkStatus.Finished
1416 
1417         works = workflow.get_new_works()
1418         works.sort(key=lambda x: x.work_id)
1419         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1420         # print(workflow_str)
1421         assert works == [work1, work2]
1422         assert workflow.is_terminated() is False
1423 
1424         works = workflow.get_new_works()
1425         works.sort(key=lambda x: x.work_id)
1426         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1427         # print(workflow_str)
1428         assert works == [work1, work2]
1429         assert workflow.is_terminated() is False
1430 
1431         for work in works:
1432             work.transforming = True
1433             work.status = WorkStatus.Failed
1434 
1435         works = workflow.get_new_works()
1436         works.sort(key=lambda x: x.work_id)
1437         assert works == []
1438         assert workflow.is_terminated() is True
1439 
1440     def test_custom_condition(self):
1441         work1 = Work(
1442             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1443         )
1444         work1.add_custom_condition(key="to_continue", value=True)
1445         assert work1.get_custom_condition_status() is False
1446         # output_data will be set based on the outputs of jobs.
1447         work1.output_data = {"to_continue": True}
1448         assert work1.get_custom_condition_status() is True
1449 
1450     def test_workflow_subloopworkflow4(self):
1451         work1 = Work(
1452             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1453         )
1454         work2 = Work(
1455             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1456         )
1457 
1458         workflow1 = Workflow()
1459         workflow1.add_work(work1, initial=False)
1460         workflow1.add_work(work2, initial=False)
1461 
1462         work2.add_custom_condition(key="to_continue", value=True)
1463         cond = Condition(cond=work2.get_custom_condition_status)
1464         workflow1.add_loop_condition(cond)
1465 
1466         work3 = Work(
1467             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1468         )
1469         cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1470 
1471         workflow = Workflow()
1472         workflow.add_work(work3, initial=False)
1473         workflow.add_work(workflow1, initial=False)
1474         workflow.add_condition(cond1)
1475 
1476         works = workflow.get_new_works()
1477         works.sort(key=lambda x: x.work_id)
1478         assert works == [work3]
1479         # assert(workflow.num_run == 1)
1480 
1481         for work in works:
1482             # if work.work_id == 3:
1483             work.transforming = True
1484             work.status = WorkStatus.Failed
1485 
1486         works = workflow.get_new_works()
1487         works.sort(key=lambda x: x.work_id)
1488         assert works == []
1489         assert workflow.is_terminated() is True
1490 
1491     def test_workflow_subloopworkflow_reload(self):
1492         work1 = Work(
1493             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1494         )
1495         work2 = Work(
1496             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1497         )
1498 
1499         workflow1 = Workflow()
1500         workflow1.add_work(work1, initial=False)
1501         workflow1.add_work(work2, initial=False)
1502 
1503         cond = Condition(cond=work2.is_finished)
1504         workflow1.add_loop_condition(cond)
1505 
1506         work3 = Work(
1507             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1508         )
1509         cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1510 
1511         workflow = Workflow()
1512         workflow.add_work(work3, initial=False)
1513         workflow.add_work(workflow1, initial=False)
1514         workflow.add_condition(cond1)
1515 
1516         # reload
1517         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1518         # print(workflow_str)
1519         workflow = json_loads(workflow_str)
1520 
1521         works = workflow.get_new_works()
1522         works.sort(key=lambda x: x.work_id)
1523         assert works == [work3]
1524         # assert(workflow.num_run == 1)
1525 
1526         for work in works:
1527             # if work.work_id == 3:
1528             work.transforming = True
1529             work.status = WorkStatus.Finished
1530 
1531         # reload
1532         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1533         # print(workflow_str)
1534         workflow = json_loads(workflow_str)
1535 
1536         works = workflow.get_new_works()
1537         works.sort(key=lambda x: x.work_id)
1538         assert works == [work1, work2]
1539         assert workflow.is_terminated() is False
1540 
1541         for work in works:
1542             work.transforming = True
1543             work.status = WorkStatus.Finished
1544 
1545         # reload
1546         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1547         # print(workflow_str)
1548         workflow = json_loads(workflow_str)
1549 
1550         works = workflow.get_new_works()
1551         works.sort(key=lambda x: x.work_id)
1552         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1553         # print(workflow_str)
1554         assert works == [work1, work2]
1555         assert workflow.is_terminated() is False
1556 
1557         for work in works:
1558             work.transforming = True
1559             work.status = WorkStatus.Failed
1560 
1561         # reload
1562         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1563         # print(workflow_str)
1564         workflow = json_loads(workflow_str)
1565 
1566         works = workflow.get_new_works()
1567         works.sort(key=lambda x: x.work_id)
1568         assert works == []
1569         assert workflow.is_terminated() is True
1570 
1571     def test_workflow_subloopworkflow_parameter_link(self):
1572         work1 = Work(
1573             executable="/bin/hostname",
1574             arguments=None,
1575             sandbox=None,
1576             work_id=1,
1577             primary_input_collection={
1578                 "scope": "test_scop",
1579                 "name": "input_test_work_1",
1580             },
1581             primary_output_collection={
1582                 "scope": "test_scop",
1583                 "name": "output_test_work_1",
1584             },
1585         )
1586         work2 = Work(
1587             executable="/bin/hostname",
1588             arguments=None,
1589             sandbox=None,
1590             work_id=2,
1591             primary_input_collection={
1592                 "scope": "test_scop",
1593                 "name": "input_test_work_2",
1594             },
1595             primary_output_collection={
1596                 "scope": "test_scop",
1597                 "name": "output_test_work_2",
1598             },
1599         )
1600 
1601         workflow1 = Workflow()
1602         workflow1.add_work(work1, initial=False)
1603         workflow1.add_work(work2, initial=False)
1604 
1605         cond1 = Condition(cond=work1.is_finished, true_work=work2)
1606         workflow1.add_condition(cond1)
1607 
1608         p_link = ParameterLink(
1609             parameters=[
1610                 {
1611                     "source": "primary_output_collection",
1612                     "destination": "primary_input_collection",
1613                 }
1614             ]
1615         )
1616         workflow1.add_parameter_link(work1, work2, p_link)
1617 
1618         works = workflow1.get_new_works()
1619         works.sort(key=lambda x: x.work_id)
1620         assert works == [work1]
1621         # assert(workflow.num_run == 1)
1622         work1_1 = works[0]
1623         assert work1_1.primary_input_collection.name == "input_test_work_1"
1624         assert work1_1.primary_output_collection.name == "output_test_work_1"
1625 
1626         for work in works:
1627             # if work.work_id == 3:
1628             work.transforming = True
1629             work.status = WorkStatus.Finished
1630 
1631         works = workflow1.get_new_works()
1632         works.sort(key=lambda x: x.work_id)
1633         assert works == [work2]
1634         assert workflow1.is_terminated() is False
1635         work2_1 = works[0]
1636         # workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
1637         # print(workflow_str)
1638 
1639         assert work2_1.primary_input_collection.name == "output_test_work_1"
1640         assert work2_1.primary_output_collection.name == "output_test_work_2"
1641 
1642         for work in works:
1643             # if work.work_id == 3:
1644             work.transforming = True
1645             work.status = WorkStatus.Finished
1646 
1647         works = workflow1.get_new_works()
1648         works.sort(key=lambda x: x.work_id)
1649         assert works == []
1650         assert workflow1.is_terminated() is True
1651 
1652     def test_workflow_subloopworkflow_parameter_link1(self):
1653         work1 = Work(
1654             executable="/bin/hostname",
1655             arguments=None,
1656             sandbox=None,
1657             work_id=1,
1658             primary_input_collection={
1659                 "scope": "test_scop",
1660                 "name": "input_test_work_1",
1661             },
1662             primary_output_collection={
1663                 "scope": "test_scop",
1664                 "name": "output_test_work_1",
1665             },
1666         )
1667         work2 = Work(
1668             executable="/bin/hostname",
1669             arguments=None,
1670             sandbox=None,
1671             work_id=2,
1672             primary_input_collection={
1673                 "scope": "test_scop",
1674                 "name": "input_test_work_2",
1675             },
1676             primary_output_collection={
1677                 "scope": "test_scop",
1678                 "name": "output_test_work_2",
1679             },
1680         )
1681 
1682         workflow1 = Workflow()
1683         workflow1.add_work(work1, initial=False)
1684         workflow1.add_work(work2, initial=False)
1685 
1686         cond1 = Condition(cond=work1.is_finished, true_work=work2)
1687         workflow1.add_condition(cond1)
1688 
1689         p_link = ParameterLink(
1690             parameters=[
1691                 {
1692                     "source": "primary_output_collection",
1693                     "destination": "primary_input_collection",
1694                 }
1695             ]
1696         )
1697         workflow1.add_parameter_link(work1, work2, p_link)
1698 
1699         cond = Condition(cond=work2.is_finished)
1700         workflow1.add_loop_condition(cond)
1701 
1702         works = workflow1.get_new_works()
1703         works.sort(key=lambda x: x.work_id)
1704         assert works == [work1]
1705         # assert(workflow.num_run == 1)
1706         work1_1 = works[0]
1707         assert work1_1.primary_input_collection.name == "input_test_work_1"
1708         assert work1_1.primary_output_collection.name == "output_test_work_1"
1709 
1710         for work in works:
1711             # if work.work_id == 3:
1712             work.transforming = True
1713             work.status = WorkStatus.Finished
1714 
1715         works = workflow1.get_new_works()
1716         works.sort(key=lambda x: x.work_id)
1717         assert works == [work2]
1718         assert workflow1.is_terminated() is False
1719         work2_1 = works[0]
1720         # workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
1721         # print(workflow_str)
1722 
1723         assert work2_1.primary_input_collection.name == "output_test_work_1"
1724         assert work2_1.primary_output_collection.name == "output_test_work_2"
1725 
1726         for work in works:
1727             # if work.work_id == 3:
1728             work.transforming = True
1729             work.status = WorkStatus.Finished
1730 
1731         works = workflow1.get_new_works()
1732         works.sort(key=lambda x: x.work_id)
1733         assert works == [work1]
1734         assert workflow1.is_terminated() is False
1735         # workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
1736         # print(workflow_str)
1737         work1_2 = works[0]
1738         assert work1_2.primary_input_collection.name == "input_test_work_1"
1739         assert work1_2.primary_output_collection.name == "output_test_work_1.2"
1740 
1741         for work in works:
1742             # if work.work_id == 3:
1743             work.transforming = True
1744             work.status = WorkStatus.Finished
1745 
1746         works = workflow1.get_new_works()
1747         works.sort(key=lambda x: x.work_id)
1748         assert works == [work2]
1749         assert workflow1.is_terminated() is False
1750         work2_2 = works[0]
1751         # workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
1752         # print(workflow_str)
1753 
1754         assert work2_2.primary_input_collection.name == "output_test_work_1.2"
1755         assert work2_2.primary_output_collection.name == "output_test_work_2.2"
1756 
1757         for work in works:
1758             # if work.work_id == 3:
1759             work.transforming = True
1760             work.status = WorkStatus.Failed
1761 
1762         works = workflow1.get_new_works()
1763         works.sort(key=lambda x: x.work_id)
1764         assert works == []
1765         assert workflow1.is_terminated() is True
1766 
1767     def test_workflow_subloopworkflow_parameter_link2(self):
1768         work1 = Work(
1769             executable="/bin/hostname",
1770             arguments=None,
1771             sandbox=None,
1772             work_id=1,
1773             primary_input_collection={
1774                 "scope": "test_scop",
1775                 "name": "input_test_work_1",
1776             },
1777             primary_output_collection={
1778                 "scope": "test_scop",
1779                 "name": "output_test_work_1",
1780             },
1781         )
1782         work2 = Work(
1783             executable="/bin/hostname",
1784             arguments=None,
1785             sandbox=None,
1786             work_id=2,
1787             primary_input_collection={
1788                 "scope": "test_scop",
1789                 "name": "input_test_work_2",
1790             },
1791             primary_output_collection={
1792                 "scope": "test_scop",
1793                 "name": "output_test_work_2",
1794             },
1795         )
1796 
1797         workflow1 = Workflow()
1798         workflow1.add_work(work1, initial=False)
1799         workflow1.add_work(work2, initial=False)
1800 
1801         cond1 = Condition(cond=work1.is_finished, true_work=work2)
1802         workflow1.add_condition(cond1)
1803 
1804         p_link = ParameterLink(
1805             parameters=[
1806                 {
1807                     "source": "primary_output_collection",
1808                     "destination": "primary_input_collection",
1809                 }
1810             ]
1811         )
1812         workflow1.add_parameter_link(work1, work2, p_link)
1813 
1814         cond = Condition(cond=work2.is_finished)
1815         workflow1.add_loop_condition(cond)
1816 
1817         work3 = Work(
1818             executable="/bin/hostname",
1819             arguments=None,
1820             sandbox=None,
1821             work_id=3,
1822             primary_input_collection={
1823                 "scope": "test_scop",
1824                 "name": "input_test_work_3",
1825             },
1826             primary_output_collection={
1827                 "scope": "test_scop",
1828                 "name": "output_test_work_3",
1829             },
1830         )
1831         cond2 = Condition(cond=work3.is_finished, true_work=workflow1)
1832         p_link1 = ParameterLink(
1833             parameters=[
1834                 {
1835                     "source": "primary_output_collection",
1836                     "destination": "primary_input_collection",
1837                 }
1838             ]
1839         )
1840 
1841         workflow = Workflow()
1842         workflow.add_work(work3, initial=False)
1843         workflow.add_work(workflow1, initial=False)
1844         workflow.add_condition(cond2)
1845         workflow.add_parameter_link(work3, work1, p_link1)
1846 
1847         works = workflow.get_new_works()
1848         works.sort(key=lambda x: x.work_id)
1849         assert works == [work3]
1850         # assert(workflow.num_run == 1)
1851         work3_1 = works[0]
1852         assert work3_1.primary_input_collection.name == "input_test_work_3"
1853         assert work3_1.primary_output_collection.name == "output_test_work_3"
1854 
1855         for work in works:
1856             # if work.work_id == 3:
1857             work.transforming = True
1858             work.status = WorkStatus.Finished
1859 
1860         works = workflow.get_new_works()
1861         works.sort(key=lambda x: x.work_id)
1862         assert works == [work1]
1863         assert workflow.is_terminated() is False
1864         work1_1 = works[0]
1865         # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1866         # print(workflow_str)
1867 
1868         assert work1_1.primary_input_collection.name == "output_test_work_3"
1869         assert work1_1.primary_output_collection.name == "output_test_work_1"
1870 
1871         for work in works:
1872             # if work.work_id == 3:
1873             work.transforming = True
1874             work.status = WorkStatus.Finished
1875 
1876         works = workflow.get_new_works()
1877         works.sort(key=lambda x: x.work_id)
1878         assert works == [work2]
1879         assert workflow1.is_terminated() is False
1880         # workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
1881         # print(workflow_str)
1882         work2_1 = works[0]
1883         assert work2_1.primary_input_collection.name == "output_test_work_1"
1884         assert work2_1.primary_output_collection.name == "output_test_work_2"
1885 
1886         for work in works:
1887             # if work.work_id == 3:
1888             work.transforming = True
1889             work.status = WorkStatus.Finished
1890 
1891         works = workflow.get_new_works()
1892         works.sort(key=lambda x: x.work_id)
1893         assert works == [work1]
1894         assert workflow1.is_terminated() is False
1895         work1_2 = works[0]
1896         # workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
1897         # print(workflow_str)
1898 
1899         assert work1_2.primary_input_collection.name == "output_test_work_3"
1900         assert work1_2.primary_output_collection.name == "output_test_work_1.2"
1901 
1902         for work in works:
1903             # if work.work_id == 3:
1904             work.transforming = True
1905             work.status = WorkStatus.Finished
1906 
1907         works = workflow.get_new_works()
1908         works.sort(key=lambda x: x.work_id)
1909         assert works == [work2]
1910         assert workflow1.is_terminated() is False
1911         work2_2 = works[0]
1912         # workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
1913         # print(workflow_str)
1914 
1915         assert work2_2.primary_input_collection.name == "output_test_work_1.2"
1916         assert work2_2.primary_output_collection.name == "output_test_work_2.2"
1917 
1918         for work in works:
1919             # if work.work_id == 3:
1920             work.transforming = True
1921             work.status = WorkStatus.Failed
1922 
1923         works = workflow.get_new_works()
1924         works.sort(key=lambda x: x.work_id)
1925         assert works == []
1926         assert workflow.is_terminated() is True