Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2021
0010 
0011 
0012 """
0013 Test workflow condtions.
0014 """
0015 
0016 import unittest2 as unittest
0017 
0018 # from nose.tools import assert_equal
0019 from idds.common.utils import setup_logging
0020 
0021 from idds.common.utils import json_dumps, json_loads
0022 
0023 from idds.workflow.work import Work, WorkStatus
0024 from idds.workflow.workflow import (
0025     CompositeCondition,
0026     AndCondition,
0027     OrCondition,
0028     Condition,
0029     ConditionTrigger,
0030     Workflow,
0031 )
0032 
0033 
0034 setup_logging(__name__)
0035 
0036 
0037 class TestWorkflowCondtion(unittest.TestCase):
0038 
0039     def test_condition(self):
0040         # init_p = Parameter({'input_dataset': 'data17:data17.test.raw.1'})
0041         work1 = Work(
0042             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0043         )
0044         work2 = Work(
0045             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0046         )
0047         work3 = Work(
0048             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0049         )
0050         work4 = Work(
0051             executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0052         )
0053         work5 = Work(
0054             executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0055         )
0056         work6 = Work(
0057             executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0058         )
0059         work7 = Work(
0060             executable="echo",
0061             arguments="--in=IN_DATASET --out=OUT_DATASET",
0062             sandbox=None,
0063             work_id=7,
0064             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0065             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0066         )
0067         work8 = Work(
0068             executable="echo",
0069             arguments="--in=IN_DATASET --out=OUT_DATASET",
0070             sandbox=None,
0071             work_id=8,
0072             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0073             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0074         )
0075 
0076         workflow = Workflow()
0077         workflow.add_work(work1, initial=True)
0078         workflow.add_work(work2, initial=True)
0079         workflow.add_work(work3, initial=False)
0080         workflow.add_work(work8, initial=False)
0081 
0082         # CompositeCondition
0083         cond1 = CompositeCondition(
0084             conditions=work1.is_finished, true_works=work2, false_works=work3
0085         )
0086         works = cond1.all_works()
0087         assert works == [work1, work2, work3]
0088         works = cond1.all_pre_works()
0089         assert works == [work1]
0090         works = cond1.all_next_works()
0091         assert works == [work2, work3]
0092         cond_status = cond1.get_condition_status()
0093         assert cond_status is False
0094 
0095         work1.status = WorkStatus.Finished
0096         cond_status = cond1.get_condition_status()
0097         assert cond_status is True
0098         work1.status = WorkStatus.New
0099 
0100         works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0101         assert works == [work3]
0102         work1.status = WorkStatus.Finished
0103         works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0104         assert works == [work2]
0105         work1.status = WorkStatus.New
0106 
0107         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0108         assert works == [work3]
0109         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0110         assert works == []
0111         work1.status = WorkStatus.Finished
0112         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0113         assert works == [work2]
0114         works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0115         assert works == []
0116         work1.status = WorkStatus.New
0117 
0118         works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0119         assert works == [work3]
0120         work1.status = WorkStatus.Finished
0121         works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0122         assert works == [work2]
0123         work1.status = WorkStatus.New
0124 
0125         # CompositeCondition
0126         cond2 = CompositeCondition(
0127             conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0128             true_works=[work4, work5],
0129             false_works=[work6, work7],
0130         )
0131 
0132         works = cond2.all_works()
0133         assert works == [work1, work2, work3, work4, work5, work6, work7]
0134         works = cond2.all_pre_works()
0135         assert works == [work1, work2, work3]
0136         works = cond2.all_next_works()
0137         assert works == [work4, work5, work6, work7]
0138         cond_status = cond2.get_condition_status()
0139         assert cond_status is False
0140 
0141         work1.status = WorkStatus.Finished
0142         cond_status = cond2.get_condition_status()
0143         assert cond_status is False
0144         work2.status = WorkStatus.Finished
0145         cond_status = cond2.get_condition_status()
0146         assert cond_status is False
0147         work3.status = WorkStatus.Finished
0148         cond_status = cond2.get_condition_status()
0149         assert cond_status is True
0150         work1.status = WorkStatus.New
0151         work2.status = WorkStatus.New
0152         work3.status = WorkStatus.New
0153 
0154         works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0155         assert works == [work6, work7]
0156         work1.status = WorkStatus.Finished
0157         work2.status = WorkStatus.Finished
0158         work3.status = WorkStatus.Finished
0159         works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0160         assert works == [work4, work5]
0161         work1.status = WorkStatus.New
0162         work2.status = WorkStatus.New
0163         work3.status = WorkStatus.New
0164 
0165         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0166         assert works == [work6, work7]
0167         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0168         assert works == []
0169         work1.status = WorkStatus.Finished
0170         work2.status = WorkStatus.Finished
0171         work3.status = WorkStatus.Finished
0172         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0173         assert works == [work4, work5]
0174         works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0175         assert works == []
0176         work1.status = WorkStatus.New
0177         work2.status = WorkStatus.New
0178         work3.status = WorkStatus.New
0179 
0180         works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0181         assert works == [work6, work7]
0182         work1.status = WorkStatus.Finished
0183         work2.status = WorkStatus.Finished
0184         work3.status = WorkStatus.Finished
0185         works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0186         assert works == [work4, work5]
0187         work1.status = WorkStatus.New
0188         work2.status = WorkStatus.New
0189         work3.status = WorkStatus.New
0190 
0191         # AndCondition
0192         cond3 = AndCondition(
0193             conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0194             true_works=[work4, work5],
0195             false_works=[work6, work7],
0196         )
0197 
0198         works = cond3.all_works()
0199         assert works == [work1, work2, work3, work4, work5, work6, work7]
0200         works = cond3.all_pre_works()
0201         assert works == [work1, work2, work3]
0202         works = cond3.all_next_works()
0203         assert works == [work4, work5, work6, work7]
0204         cond_status = cond3.get_condition_status()
0205         assert cond_status is False
0206 
0207         work1.status = WorkStatus.Finished
0208         cond_status = cond3.get_condition_status()
0209         assert cond_status is False
0210         work2.status = WorkStatus.Finished
0211         cond_status = cond3.get_condition_status()
0212         assert cond_status is False
0213         work3.status = WorkStatus.Finished
0214         cond_status = cond3.get_condition_status()
0215         assert cond_status is True
0216         work1.status = WorkStatus.New
0217         work2.status = WorkStatus.New
0218         work3.status = WorkStatus.New
0219 
0220         works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0221         assert works == [work6, work7]
0222         work1.status = WorkStatus.Finished
0223         work2.status = WorkStatus.Finished
0224         work3.status = WorkStatus.Finished
0225         works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0226         assert works == [work4, work5]
0227         work1.status = WorkStatus.New
0228         work2.status = WorkStatus.New
0229         work3.status = WorkStatus.New
0230 
0231         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0232         assert works == [work6, work7]
0233         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0234         assert works == []
0235         work1.status = WorkStatus.Finished
0236         work2.status = WorkStatus.Finished
0237         work3.status = WorkStatus.Finished
0238         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0239         assert works == [work4, work5]
0240         works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0241         assert works == []
0242         work1.status = WorkStatus.New
0243         work2.status = WorkStatus.New
0244         work3.status = WorkStatus.New
0245 
0246         works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0247         assert works == [work6, work7]
0248         work1.status = WorkStatus.Finished
0249         work2.status = WorkStatus.Finished
0250         work3.status = WorkStatus.Finished
0251         works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0252         assert works == [work4, work5]
0253         work1.status = WorkStatus.New
0254         work2.status = WorkStatus.New
0255         work3.status = WorkStatus.New
0256 
0257         # OrCondtion
0258         cond4 = OrCondition(
0259             conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0260             true_works=[work4, work5],
0261             false_works=[work6, work7],
0262         )
0263 
0264         works = cond4.all_works()
0265         assert works == [work1, work2, work3, work4, work5, work6, work7]
0266         works = cond4.all_pre_works()
0267         assert works == [work1, work2, work3]
0268         works = cond4.all_next_works()
0269         assert works == [work4, work5, work6, work7]
0270         cond_status = cond4.get_condition_status()
0271         assert cond_status is False
0272 
0273         work1.status = WorkStatus.Finished
0274         cond_status = cond4.get_condition_status()
0275         assert cond_status is True
0276         work1.status = WorkStatus.New
0277         work2.status = WorkStatus.Finished
0278         cond_status = cond4.get_condition_status()
0279         assert cond_status is True
0280         work2.status = WorkStatus.New
0281         work3.status = WorkStatus.Finished
0282         cond_status = cond4.get_condition_status()
0283         assert cond_status is True
0284         work1.status = WorkStatus.New
0285         work2.status = WorkStatus.New
0286         work3.status = WorkStatus.New
0287 
0288         works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0289         assert works == [work6, work7]
0290         work1.status = WorkStatus.Finished
0291         # work2.status = WorkStatus.Finished
0292         # work3.status = WorkStatus.Finished
0293         works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0294         assert works == [work4, work5]
0295         work1.status = WorkStatus.New
0296         work2.status = WorkStatus.New
0297         work3.status = WorkStatus.New
0298 
0299         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0300         assert works == [work6, work7]
0301         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0302         assert works == []
0303         work1.status = WorkStatus.Finished
0304         # work2.status = WorkStatus.Finished
0305         # work3.status = WorkStatus.Finished
0306         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0307         assert works == [work4, work5]
0308         works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0309         assert works == []
0310         work1.status = WorkStatus.New
0311         work2.status = WorkStatus.New
0312         work3.status = WorkStatus.New
0313 
0314         works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0315         assert works == [work6, work7]
0316         work1.status = WorkStatus.Finished
0317         # work2.status = WorkStatus.Finished
0318         # work3.status = WorkStatus.Finished
0319         works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0320         assert works == [work4, work5]
0321         work1.status = WorkStatus.New
0322         work2.status = WorkStatus.New
0323         work3.status = WorkStatus.New
0324 
0325         # Condition
0326         cond5 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0327 
0328         works = cond5.all_works()
0329         assert works == [work1, work2, work3]
0330         works = cond5.all_pre_works()
0331         assert works == [work1]
0332         works = cond5.all_next_works()
0333         assert works == [work2, work3]
0334         cond_status = cond5.get_condition_status()
0335         assert cond_status is False
0336 
0337         work1.status = WorkStatus.Finished
0338         cond_status = cond5.get_condition_status()
0339         assert cond_status is True
0340         work1.status = WorkStatus.New
0341 
0342         works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0343         assert works == [work3]
0344         work1.status = WorkStatus.Finished
0345         works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0346         assert works == [work2]
0347         work1.status = WorkStatus.New
0348 
0349         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0350         assert works == [work3]
0351         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0352         assert works == []
0353         work1.status = WorkStatus.Finished
0354         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0355         assert works == [work2]
0356         works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0357         assert works == []
0358         work1.status = WorkStatus.New
0359 
0360         works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0361         assert works == [work3]
0362         work1.status = WorkStatus.Finished
0363         works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0364         assert works == [work2]
0365         work1.status = WorkStatus.New
0366 
0367         # multiple conditions
0368         cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0369         cond7 = CompositeCondition(
0370             conditions=[work4.is_finished, work5.is_finished],
0371             true_works=[work6, cond6],
0372             false_works=work7,
0373         )
0374 
0375         works = cond7.all_works()
0376         works.sort(key=lambda x: x.work_id)
0377         assert works == [work1, work2, work3, work4, work5, work6, work7]
0378         works = cond7.all_pre_works()
0379         works.sort(key=lambda x: x.work_id)
0380         assert works == [work1, work4, work5]
0381         works = cond7.all_next_works()
0382         works.sort(key=lambda x: x.work_id)
0383         # print([w.work_id for w in works])
0384         assert works == [work2, work3, work6, work7]
0385         cond_status = cond7.get_condition_status()
0386         assert cond_status is False
0387 
0388         work4.status = WorkStatus.Finished
0389         cond_status = cond7.get_condition_status()
0390         assert cond_status is False
0391         work5.status = WorkStatus.Finished
0392         cond_status = cond7.get_condition_status()
0393         assert cond_status is True
0394         work4.status = WorkStatus.New
0395         work5.status = WorkStatus.New
0396 
0397         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0398         assert works == [work7]
0399         work4.status = WorkStatus.Finished
0400         work5.status = WorkStatus.Finished
0401         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0402         works.sort(key=lambda x: x.work_id)
0403         assert works == [work3, work6]
0404         work1.status = WorkStatus.Finished
0405         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0406         works.sort(key=lambda x: x.work_id)
0407         assert works == [work2, work6]
0408         work4.status = WorkStatus.New
0409         work5.status = WorkStatus.New
0410         work1.status = WorkStatus.New
0411 
0412         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0413         assert works == [work7]
0414         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0415         assert works == []
0416         work4.status = WorkStatus.Finished
0417         work5.status = WorkStatus.Finished
0418         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0419         works.sort(key=lambda x: x.work_id)
0420         assert works == [work3, work6]
0421         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0422         assert works == []
0423         work1.status = WorkStatus.Finished
0424         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0425         works.sort(key=lambda x: x.work_id)
0426         assert works == [work2]
0427         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0428         works.sort(key=lambda x: x.work_id)
0429         assert works == []
0430         work4.status = WorkStatus.New
0431         work5.status = WorkStatus.New
0432         work1.status = WorkStatus.New
0433 
0434         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0435         assert works == [work7]
0436         work4.status = WorkStatus.Finished
0437         work5.status = WorkStatus.Finished
0438         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0439         works.sort(key=lambda x: x.work_id)
0440         assert works == [work3, work6]
0441         work1.status = WorkStatus.Finished
0442         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0443         works.sort(key=lambda x: x.work_id)
0444         assert works == [work2, work6]
0445         work4.status = WorkStatus.New
0446         work5.status = WorkStatus.New
0447         work1.status = WorkStatus.New
0448 
0449         return workflow
0450 
0451     def print_workflow(self, workflow):
0452         print("print workflow")
0453         print(workflow.conditions)
0454         for cond_id in workflow.conditions:
0455             print(cond_id)
0456             cond = workflow.conditions[cond_id]
0457             print(cond)
0458             print(cond.conditions)
0459             print(cond.true_works)
0460             print(cond.false_works)
0461             for w in cond.true_works:
0462                 print(w)
0463                 if isinstance(w, CompositeCondition):
0464                     print(w.conditions)
0465                     print(w.true_works)
0466                     print(w.false_works)
0467 
0468     def test_workflow(self):
0469         work1 = Work(
0470             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0471         )
0472         work2 = Work(
0473             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0474         )
0475         work3 = Work(
0476             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0477         )
0478         work4 = Work(
0479             executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0480         )
0481         work5 = Work(
0482             executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0483         )
0484         work6 = Work(
0485             executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0486         )
0487         work7 = Work(
0488             executable="echo",
0489             arguments="--in=IN_DATASET --out=OUT_DATASET",
0490             sandbox=None,
0491             work_id=7,
0492             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0493             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0494         )
0495         work8 = Work(
0496             executable="echo",
0497             arguments="--in=IN_DATASET --out=OUT_DATASET",
0498             sandbox=None,
0499             work_id=8,
0500             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0501             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0502         )
0503 
0504         workflow = Workflow()
0505         workflow.add_work(work1, initial=False)
0506         workflow.add_work(work2, initial=False)
0507         workflow.add_work(work3, initial=False)
0508         workflow.add_work(work4, initial=False)
0509         workflow.add_work(work5, initial=False)
0510         workflow.add_work(work6, initial=False)
0511         workflow.add_work(work7, initial=False)
0512         workflow.add_work(work8, initial=False)
0513 
0514         # multiple conditions
0515         cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0516         cond7 = CompositeCondition(
0517             conditions=[work4.is_finished, work5.is_finished],
0518             true_works=[work6, cond6],
0519             false_works=work7,
0520         )
0521 
0522         workflow.add_condition(cond7)
0523         id_works = workflow.independent_works
0524         # print(id_works)
0525         id_works.sort()
0526         id_works_1 = [work1, work4, work5, work8]
0527         id_works_1 = [w.get_template_id() for w in id_works_1]
0528         id_works_1.sort()
0529         # id_works.sort(key=lambda x: x.work_id)
0530         assert id_works == id_works_1
0531 
0532         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0533         # print(workflow_str)
0534         workflow1 = json_loads(workflow_str)
0535         # print('before load_metadata')
0536         # self.print_workflow(workflow1)
0537         workflow1.load_metadata()
0538         # print('after load_metadata')
0539         # self.print_workflow(workflow1)
0540         workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0541         assert workflow_str == workflow_str1
0542 
0543         works = cond7.all_works()
0544         works.sort(key=lambda x: x.work_id)
0545         assert works == [work1, work2, work3, work4, work5, work6, work7]
0546         works = cond7.all_pre_works()
0547         works.sort(key=lambda x: x.work_id)
0548         assert works == [work1, work4, work5]
0549         works = cond7.all_next_works()
0550         works.sort(key=lambda x: x.work_id)
0551         # print([w.work_id for w in works])
0552         assert works == [work2, work3, work6, work7]
0553         cond_status = cond7.get_condition_status()
0554         assert cond_status is False
0555 
0556         work4.status = WorkStatus.Finished
0557         cond_status = cond7.get_condition_status()
0558         assert cond_status is False
0559         work5.status = WorkStatus.Finished
0560         cond_status = cond7.get_condition_status()
0561         assert cond_status is True
0562         work4.status = WorkStatus.New
0563         work5.status = WorkStatus.New
0564 
0565         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0566         assert works == [work7]
0567         work4.status = WorkStatus.Finished
0568         work5.status = WorkStatus.Finished
0569         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0570         works.sort(key=lambda x: x.work_id)
0571         assert works == [work3, work6]
0572         work1.status = WorkStatus.Finished
0573         works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0574         works.sort(key=lambda x: x.work_id)
0575         assert works == [work2, work6]
0576         work4.status = WorkStatus.New
0577         work5.status = WorkStatus.New
0578         work1.status = WorkStatus.New
0579 
0580         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0581         assert works == [work7]
0582         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0583         assert works == []
0584         work4.status = WorkStatus.Finished
0585         work5.status = WorkStatus.Finished
0586         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0587         works.sort(key=lambda x: x.work_id)
0588         assert works == [work3, work6]
0589         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0590         assert works == []
0591         work1.status = WorkStatus.Finished
0592         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0593         works.sort(key=lambda x: x.work_id)
0594         assert works == [work2]
0595         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0596         works.sort(key=lambda x: x.work_id)
0597         assert works == []
0598         work4.status = WorkStatus.New
0599         work5.status = WorkStatus.New
0600         work1.status = WorkStatus.New
0601 
0602         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0603         assert works == [work7]
0604         work4.status = WorkStatus.Finished
0605         work5.status = WorkStatus.Finished
0606         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0607         works.sort(key=lambda x: x.work_id)
0608         assert works == [work3, work6]
0609         work1.status = WorkStatus.Finished
0610         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0611         works.sort(key=lambda x: x.work_id)
0612         assert works == [work2, work6]
0613         work4.status = WorkStatus.New
0614         work5.status = WorkStatus.New
0615         work1.status = WorkStatus.New
0616 
0617         return workflow
0618 
0619     def test_workflow_condition_reload(self):
0620         work1 = Work(
0621             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0622         )
0623         work2 = Work(
0624             executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0625         )
0626         work3 = Work(
0627             executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0628         )
0629         work4 = Work(
0630             executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0631         )
0632         work5 = Work(
0633             executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0634         )
0635         work6 = Work(
0636             executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0637         )
0638         work7 = Work(
0639             executable="echo",
0640             arguments="--in=IN_DATASET --out=OUT_DATASET",
0641             sandbox=None,
0642             work_id=7,
0643             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0644             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0645         )
0646         work8 = Work(
0647             executable="echo",
0648             arguments="--in=IN_DATASET --out=OUT_DATASET",
0649             sandbox=None,
0650             work_id=8,
0651             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0652             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0653         )
0654 
0655         workflow = Workflow()
0656         workflow.add_work(work1, initial=False)
0657         workflow.add_work(work2, initial=False)
0658         workflow.add_work(work3, initial=False)
0659         workflow.add_work(work4, initial=False)
0660         workflow.add_work(work5, initial=False)
0661         workflow.add_work(work6, initial=False)
0662         workflow.add_work(work7, initial=False)
0663         workflow.add_work(work8, initial=False)
0664 
0665         # multiple conditions
0666         cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0667         cond7 = CompositeCondition(
0668             conditions=[work4.is_finished, work5.is_finished],
0669             true_works=[work6, cond6],
0670             false_works=work7,
0671         )
0672 
0673         workflow.add_condition(cond7)
0674 
0675         workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0676         # print(workflow_str)
0677         workflow1 = json_loads(workflow_str)
0678         # print('before load_metadata')
0679         # self.print_workflow(workflow1)
0680         workflow1.load_metadata()
0681         # print('after load_metadata')
0682         # self.print_workflow(workflow1)
0683         workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0684         assert workflow_str == workflow_str1
0685 
0686         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0687         assert works == [work7]
0688         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0689         assert works == []
0690         work4.status = WorkStatus.Finished
0691         work5.status = WorkStatus.Finished
0692         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0693         works.sort(key=lambda x: x.work_id)
0694         assert works == [work3, work6]
0695         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0696         assert works == []
0697         work1.status = WorkStatus.Finished
0698         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0699         works.sort(key=lambda x: x.work_id)
0700         assert works == [work2]
0701         works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0702         works.sort(key=lambda x: x.work_id)
0703         assert works == []
0704         work4.status = WorkStatus.New
0705         work5.status = WorkStatus.New
0706         work1.status = WorkStatus.New
0707 
0708         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0709         assert works == [work7]
0710         work4.status = WorkStatus.Finished
0711         work5.status = WorkStatus.Finished
0712         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0713         works.sort(key=lambda x: x.work_id)
0714         assert works == [work3, work6]
0715         work1.status = WorkStatus.Finished
0716         works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0717         works.sort(key=lambda x: x.work_id)
0718         assert works == [work2, work6]
0719         work4.status = WorkStatus.New
0720         work5.status = WorkStatus.New
0721         work1.status = WorkStatus.New
0722 
0723         return workflow