File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test workflow condtions.
0014 """
0015
0016 import unittest2 as unittest
0017
0018
0019 from idds.common.utils import setup_logging
0020
0021 from idds.common.utils import json_dumps, json_loads
0022
0023 from idds.workflow.work import Work, WorkStatus
0024 from idds.workflow.workflow import (
0025 CompositeCondition,
0026 AndCondition,
0027 OrCondition,
0028 Condition,
0029 ConditionTrigger,
0030 Workflow,
0031 )
0032
0033
0034 setup_logging(__name__)
0035
0036
0037 class TestWorkflowCondtion(unittest.TestCase):
0038
0039 def test_condition(self):
0040
0041 work1 = Work(
0042 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0043 )
0044 work2 = Work(
0045 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0046 )
0047 work3 = Work(
0048 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0049 )
0050 work4 = Work(
0051 executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0052 )
0053 work5 = Work(
0054 executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0055 )
0056 work6 = Work(
0057 executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0058 )
0059 work7 = Work(
0060 executable="echo",
0061 arguments="--in=IN_DATASET --out=OUT_DATASET",
0062 sandbox=None,
0063 work_id=7,
0064 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0065 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0066 )
0067 work8 = Work(
0068 executable="echo",
0069 arguments="--in=IN_DATASET --out=OUT_DATASET",
0070 sandbox=None,
0071 work_id=8,
0072 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0073 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0074 )
0075
0076 workflow = Workflow()
0077 workflow.add_work(work1, initial=True)
0078 workflow.add_work(work2, initial=True)
0079 workflow.add_work(work3, initial=False)
0080 workflow.add_work(work8, initial=False)
0081
0082
0083 cond1 = CompositeCondition(
0084 conditions=work1.is_finished, true_works=work2, false_works=work3
0085 )
0086 works = cond1.all_works()
0087 assert works == [work1, work2, work3]
0088 works = cond1.all_pre_works()
0089 assert works == [work1]
0090 works = cond1.all_next_works()
0091 assert works == [work2, work3]
0092 cond_status = cond1.get_condition_status()
0093 assert cond_status is False
0094
0095 work1.status = WorkStatus.Finished
0096 cond_status = cond1.get_condition_status()
0097 assert cond_status is True
0098 work1.status = WorkStatus.New
0099
0100 works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0101 assert works == [work3]
0102 work1.status = WorkStatus.Finished
0103 works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0104 assert works == [work2]
0105 work1.status = WorkStatus.New
0106
0107 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0108 assert works == [work3]
0109 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0110 assert works == []
0111 work1.status = WorkStatus.Finished
0112 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0113 assert works == [work2]
0114 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0115 assert works == []
0116 work1.status = WorkStatus.New
0117
0118 works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0119 assert works == [work3]
0120 work1.status = WorkStatus.Finished
0121 works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0122 assert works == [work2]
0123 work1.status = WorkStatus.New
0124
0125
0126 cond2 = CompositeCondition(
0127 conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0128 true_works=[work4, work5],
0129 false_works=[work6, work7],
0130 )
0131
0132 works = cond2.all_works()
0133 assert works == [work1, work2, work3, work4, work5, work6, work7]
0134 works = cond2.all_pre_works()
0135 assert works == [work1, work2, work3]
0136 works = cond2.all_next_works()
0137 assert works == [work4, work5, work6, work7]
0138 cond_status = cond2.get_condition_status()
0139 assert cond_status is False
0140
0141 work1.status = WorkStatus.Finished
0142 cond_status = cond2.get_condition_status()
0143 assert cond_status is False
0144 work2.status = WorkStatus.Finished
0145 cond_status = cond2.get_condition_status()
0146 assert cond_status is False
0147 work3.status = WorkStatus.Finished
0148 cond_status = cond2.get_condition_status()
0149 assert cond_status is True
0150 work1.status = WorkStatus.New
0151 work2.status = WorkStatus.New
0152 work3.status = WorkStatus.New
0153
0154 works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0155 assert works == [work6, work7]
0156 work1.status = WorkStatus.Finished
0157 work2.status = WorkStatus.Finished
0158 work3.status = WorkStatus.Finished
0159 works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0160 assert works == [work4, work5]
0161 work1.status = WorkStatus.New
0162 work2.status = WorkStatus.New
0163 work3.status = WorkStatus.New
0164
0165 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0166 assert works == [work6, work7]
0167 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0168 assert works == []
0169 work1.status = WorkStatus.Finished
0170 work2.status = WorkStatus.Finished
0171 work3.status = WorkStatus.Finished
0172 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0173 assert works == [work4, work5]
0174 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0175 assert works == []
0176 work1.status = WorkStatus.New
0177 work2.status = WorkStatus.New
0178 work3.status = WorkStatus.New
0179
0180 works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0181 assert works == [work6, work7]
0182 work1.status = WorkStatus.Finished
0183 work2.status = WorkStatus.Finished
0184 work3.status = WorkStatus.Finished
0185 works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0186 assert works == [work4, work5]
0187 work1.status = WorkStatus.New
0188 work2.status = WorkStatus.New
0189 work3.status = WorkStatus.New
0190
0191
0192 cond3 = AndCondition(
0193 conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0194 true_works=[work4, work5],
0195 false_works=[work6, work7],
0196 )
0197
0198 works = cond3.all_works()
0199 assert works == [work1, work2, work3, work4, work5, work6, work7]
0200 works = cond3.all_pre_works()
0201 assert works == [work1, work2, work3]
0202 works = cond3.all_next_works()
0203 assert works == [work4, work5, work6, work7]
0204 cond_status = cond3.get_condition_status()
0205 assert cond_status is False
0206
0207 work1.status = WorkStatus.Finished
0208 cond_status = cond3.get_condition_status()
0209 assert cond_status is False
0210 work2.status = WorkStatus.Finished
0211 cond_status = cond3.get_condition_status()
0212 assert cond_status is False
0213 work3.status = WorkStatus.Finished
0214 cond_status = cond3.get_condition_status()
0215 assert cond_status is True
0216 work1.status = WorkStatus.New
0217 work2.status = WorkStatus.New
0218 work3.status = WorkStatus.New
0219
0220 works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0221 assert works == [work6, work7]
0222 work1.status = WorkStatus.Finished
0223 work2.status = WorkStatus.Finished
0224 work3.status = WorkStatus.Finished
0225 works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0226 assert works == [work4, work5]
0227 work1.status = WorkStatus.New
0228 work2.status = WorkStatus.New
0229 work3.status = WorkStatus.New
0230
0231 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0232 assert works == [work6, work7]
0233 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0234 assert works == []
0235 work1.status = WorkStatus.Finished
0236 work2.status = WorkStatus.Finished
0237 work3.status = WorkStatus.Finished
0238 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0239 assert works == [work4, work5]
0240 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0241 assert works == []
0242 work1.status = WorkStatus.New
0243 work2.status = WorkStatus.New
0244 work3.status = WorkStatus.New
0245
0246 works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0247 assert works == [work6, work7]
0248 work1.status = WorkStatus.Finished
0249 work2.status = WorkStatus.Finished
0250 work3.status = WorkStatus.Finished
0251 works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0252 assert works == [work4, work5]
0253 work1.status = WorkStatus.New
0254 work2.status = WorkStatus.New
0255 work3.status = WorkStatus.New
0256
0257
0258 cond4 = OrCondition(
0259 conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0260 true_works=[work4, work5],
0261 false_works=[work6, work7],
0262 )
0263
0264 works = cond4.all_works()
0265 assert works == [work1, work2, work3, work4, work5, work6, work7]
0266 works = cond4.all_pre_works()
0267 assert works == [work1, work2, work3]
0268 works = cond4.all_next_works()
0269 assert works == [work4, work5, work6, work7]
0270 cond_status = cond4.get_condition_status()
0271 assert cond_status is False
0272
0273 work1.status = WorkStatus.Finished
0274 cond_status = cond4.get_condition_status()
0275 assert cond_status is True
0276 work1.status = WorkStatus.New
0277 work2.status = WorkStatus.Finished
0278 cond_status = cond4.get_condition_status()
0279 assert cond_status is True
0280 work2.status = WorkStatus.New
0281 work3.status = WorkStatus.Finished
0282 cond_status = cond4.get_condition_status()
0283 assert cond_status is True
0284 work1.status = WorkStatus.New
0285 work2.status = WorkStatus.New
0286 work3.status = WorkStatus.New
0287
0288 works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0289 assert works == [work6, work7]
0290 work1.status = WorkStatus.Finished
0291
0292
0293 works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0294 assert works == [work4, work5]
0295 work1.status = WorkStatus.New
0296 work2.status = WorkStatus.New
0297 work3.status = WorkStatus.New
0298
0299 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0300 assert works == [work6, work7]
0301 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0302 assert works == []
0303 work1.status = WorkStatus.Finished
0304
0305
0306 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0307 assert works == [work4, work5]
0308 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0309 assert works == []
0310 work1.status = WorkStatus.New
0311 work2.status = WorkStatus.New
0312 work3.status = WorkStatus.New
0313
0314 works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0315 assert works == [work6, work7]
0316 work1.status = WorkStatus.Finished
0317
0318
0319 works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0320 assert works == [work4, work5]
0321 work1.status = WorkStatus.New
0322 work2.status = WorkStatus.New
0323 work3.status = WorkStatus.New
0324
0325
0326 cond5 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0327
0328 works = cond5.all_works()
0329 assert works == [work1, work2, work3]
0330 works = cond5.all_pre_works()
0331 assert works == [work1]
0332 works = cond5.all_next_works()
0333 assert works == [work2, work3]
0334 cond_status = cond5.get_condition_status()
0335 assert cond_status is False
0336
0337 work1.status = WorkStatus.Finished
0338 cond_status = cond5.get_condition_status()
0339 assert cond_status is True
0340 work1.status = WorkStatus.New
0341
0342 works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0343 assert works == [work3]
0344 work1.status = WorkStatus.Finished
0345 works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0346 assert works == [work2]
0347 work1.status = WorkStatus.New
0348
0349 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0350 assert works == [work3]
0351 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0352 assert works == []
0353 work1.status = WorkStatus.Finished
0354 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0355 assert works == [work2]
0356 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0357 assert works == []
0358 work1.status = WorkStatus.New
0359
0360 works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0361 assert works == [work3]
0362 work1.status = WorkStatus.Finished
0363 works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0364 assert works == [work2]
0365 work1.status = WorkStatus.New
0366
0367
0368 cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0369 cond7 = CompositeCondition(
0370 conditions=[work4.is_finished, work5.is_finished],
0371 true_works=[work6, cond6],
0372 false_works=work7,
0373 )
0374
0375 works = cond7.all_works()
0376 works.sort(key=lambda x: x.work_id)
0377 assert works == [work1, work2, work3, work4, work5, work6, work7]
0378 works = cond7.all_pre_works()
0379 works.sort(key=lambda x: x.work_id)
0380 assert works == [work1, work4, work5]
0381 works = cond7.all_next_works()
0382 works.sort(key=lambda x: x.work_id)
0383
0384 assert works == [work2, work3, work6, work7]
0385 cond_status = cond7.get_condition_status()
0386 assert cond_status is False
0387
0388 work4.status = WorkStatus.Finished
0389 cond_status = cond7.get_condition_status()
0390 assert cond_status is False
0391 work5.status = WorkStatus.Finished
0392 cond_status = cond7.get_condition_status()
0393 assert cond_status is True
0394 work4.status = WorkStatus.New
0395 work5.status = WorkStatus.New
0396
0397 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0398 assert works == [work7]
0399 work4.status = WorkStatus.Finished
0400 work5.status = WorkStatus.Finished
0401 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0402 works.sort(key=lambda x: x.work_id)
0403 assert works == [work3, work6]
0404 work1.status = WorkStatus.Finished
0405 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0406 works.sort(key=lambda x: x.work_id)
0407 assert works == [work2, work6]
0408 work4.status = WorkStatus.New
0409 work5.status = WorkStatus.New
0410 work1.status = WorkStatus.New
0411
0412 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0413 assert works == [work7]
0414 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0415 assert works == []
0416 work4.status = WorkStatus.Finished
0417 work5.status = WorkStatus.Finished
0418 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0419 works.sort(key=lambda x: x.work_id)
0420 assert works == [work3, work6]
0421 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0422 assert works == []
0423 work1.status = WorkStatus.Finished
0424 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0425 works.sort(key=lambda x: x.work_id)
0426 assert works == [work2]
0427 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0428 works.sort(key=lambda x: x.work_id)
0429 assert works == []
0430 work4.status = WorkStatus.New
0431 work5.status = WorkStatus.New
0432 work1.status = WorkStatus.New
0433
0434 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0435 assert works == [work7]
0436 work4.status = WorkStatus.Finished
0437 work5.status = WorkStatus.Finished
0438 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0439 works.sort(key=lambda x: x.work_id)
0440 assert works == [work3, work6]
0441 work1.status = WorkStatus.Finished
0442 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0443 works.sort(key=lambda x: x.work_id)
0444 assert works == [work2, work6]
0445 work4.status = WorkStatus.New
0446 work5.status = WorkStatus.New
0447 work1.status = WorkStatus.New
0448
0449 return workflow
0450
0451 def print_workflow(self, workflow):
0452 print("print workflow")
0453 print(workflow.conditions)
0454 for cond_id in workflow.conditions:
0455 print(cond_id)
0456 cond = workflow.conditions[cond_id]
0457 print(cond)
0458 print(cond.conditions)
0459 print(cond.true_works)
0460 print(cond.false_works)
0461 for w in cond.true_works:
0462 print(w)
0463 if isinstance(w, CompositeCondition):
0464 print(w.conditions)
0465 print(w.true_works)
0466 print(w.false_works)
0467
0468 def test_workflow(self):
0469 work1 = Work(
0470 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0471 )
0472 work2 = Work(
0473 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0474 )
0475 work3 = Work(
0476 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0477 )
0478 work4 = Work(
0479 executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0480 )
0481 work5 = Work(
0482 executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0483 )
0484 work6 = Work(
0485 executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0486 )
0487 work7 = Work(
0488 executable="echo",
0489 arguments="--in=IN_DATASET --out=OUT_DATASET",
0490 sandbox=None,
0491 work_id=7,
0492 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0493 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0494 )
0495 work8 = Work(
0496 executable="echo",
0497 arguments="--in=IN_DATASET --out=OUT_DATASET",
0498 sandbox=None,
0499 work_id=8,
0500 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0501 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0502 )
0503
0504 workflow = Workflow()
0505 workflow.add_work(work1, initial=False)
0506 workflow.add_work(work2, initial=False)
0507 workflow.add_work(work3, initial=False)
0508 workflow.add_work(work4, initial=False)
0509 workflow.add_work(work5, initial=False)
0510 workflow.add_work(work6, initial=False)
0511 workflow.add_work(work7, initial=False)
0512 workflow.add_work(work8, initial=False)
0513
0514
0515 cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0516 cond7 = CompositeCondition(
0517 conditions=[work4.is_finished, work5.is_finished],
0518 true_works=[work6, cond6],
0519 false_works=work7,
0520 )
0521
0522 workflow.add_condition(cond7)
0523 id_works = workflow.independent_works
0524
0525 id_works.sort()
0526 id_works_1 = [work1, work4, work5, work8]
0527 id_works_1 = [w.get_template_id() for w in id_works_1]
0528 id_works_1.sort()
0529
0530 assert id_works == id_works_1
0531
0532 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0533
0534 workflow1 = json_loads(workflow_str)
0535
0536
0537 workflow1.load_metadata()
0538
0539
0540 workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0541 assert workflow_str == workflow_str1
0542
0543 works = cond7.all_works()
0544 works.sort(key=lambda x: x.work_id)
0545 assert works == [work1, work2, work3, work4, work5, work6, work7]
0546 works = cond7.all_pre_works()
0547 works.sort(key=lambda x: x.work_id)
0548 assert works == [work1, work4, work5]
0549 works = cond7.all_next_works()
0550 works.sort(key=lambda x: x.work_id)
0551
0552 assert works == [work2, work3, work6, work7]
0553 cond_status = cond7.get_condition_status()
0554 assert cond_status is False
0555
0556 work4.status = WorkStatus.Finished
0557 cond_status = cond7.get_condition_status()
0558 assert cond_status is False
0559 work5.status = WorkStatus.Finished
0560 cond_status = cond7.get_condition_status()
0561 assert cond_status is True
0562 work4.status = WorkStatus.New
0563 work5.status = WorkStatus.New
0564
0565 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0566 assert works == [work7]
0567 work4.status = WorkStatus.Finished
0568 work5.status = WorkStatus.Finished
0569 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0570 works.sort(key=lambda x: x.work_id)
0571 assert works == [work3, work6]
0572 work1.status = WorkStatus.Finished
0573 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0574 works.sort(key=lambda x: x.work_id)
0575 assert works == [work2, work6]
0576 work4.status = WorkStatus.New
0577 work5.status = WorkStatus.New
0578 work1.status = WorkStatus.New
0579
0580 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0581 assert works == [work7]
0582 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0583 assert works == []
0584 work4.status = WorkStatus.Finished
0585 work5.status = WorkStatus.Finished
0586 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0587 works.sort(key=lambda x: x.work_id)
0588 assert works == [work3, work6]
0589 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0590 assert works == []
0591 work1.status = WorkStatus.Finished
0592 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0593 works.sort(key=lambda x: x.work_id)
0594 assert works == [work2]
0595 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0596 works.sort(key=lambda x: x.work_id)
0597 assert works == []
0598 work4.status = WorkStatus.New
0599 work5.status = WorkStatus.New
0600 work1.status = WorkStatus.New
0601
0602 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0603 assert works == [work7]
0604 work4.status = WorkStatus.Finished
0605 work5.status = WorkStatus.Finished
0606 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0607 works.sort(key=lambda x: x.work_id)
0608 assert works == [work3, work6]
0609 work1.status = WorkStatus.Finished
0610 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0611 works.sort(key=lambda x: x.work_id)
0612 assert works == [work2, work6]
0613 work4.status = WorkStatus.New
0614 work5.status = WorkStatus.New
0615 work1.status = WorkStatus.New
0616
0617 return workflow
0618
0619 def test_workflow_condition_reload(self):
0620 work1 = Work(
0621 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0622 )
0623 work2 = Work(
0624 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0625 )
0626 work3 = Work(
0627 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0628 )
0629 work4 = Work(
0630 executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0631 )
0632 work5 = Work(
0633 executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0634 )
0635 work6 = Work(
0636 executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0637 )
0638 work7 = Work(
0639 executable="echo",
0640 arguments="--in=IN_DATASET --out=OUT_DATASET",
0641 sandbox=None,
0642 work_id=7,
0643 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0644 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0645 )
0646 work8 = Work(
0647 executable="echo",
0648 arguments="--in=IN_DATASET --out=OUT_DATASET",
0649 sandbox=None,
0650 work_id=8,
0651 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0652 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0653 )
0654
0655 workflow = Workflow()
0656 workflow.add_work(work1, initial=False)
0657 workflow.add_work(work2, initial=False)
0658 workflow.add_work(work3, initial=False)
0659 workflow.add_work(work4, initial=False)
0660 workflow.add_work(work5, initial=False)
0661 workflow.add_work(work6, initial=False)
0662 workflow.add_work(work7, initial=False)
0663 workflow.add_work(work8, initial=False)
0664
0665
0666 cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0667 cond7 = CompositeCondition(
0668 conditions=[work4.is_finished, work5.is_finished],
0669 true_works=[work6, cond6],
0670 false_works=work7,
0671 )
0672
0673 workflow.add_condition(cond7)
0674
0675 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0676
0677 workflow1 = json_loads(workflow_str)
0678
0679
0680 workflow1.load_metadata()
0681
0682
0683 workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0684 assert workflow_str == workflow_str1
0685
0686 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0687 assert works == [work7]
0688 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0689 assert works == []
0690 work4.status = WorkStatus.Finished
0691 work5.status = WorkStatus.Finished
0692 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0693 works.sort(key=lambda x: x.work_id)
0694 assert works == [work3, work6]
0695 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0696 assert works == []
0697 work1.status = WorkStatus.Finished
0698 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0699 works.sort(key=lambda x: x.work_id)
0700 assert works == [work2]
0701 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0702 works.sort(key=lambda x: x.work_id)
0703 assert works == []
0704 work4.status = WorkStatus.New
0705 work5.status = WorkStatus.New
0706 work1.status = WorkStatus.New
0707
0708 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0709 assert works == [work7]
0710 work4.status = WorkStatus.Finished
0711 work5.status = WorkStatus.Finished
0712 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0713 works.sort(key=lambda x: x.work_id)
0714 assert works == [work3, work6]
0715 work1.status = WorkStatus.Finished
0716 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0717 works.sort(key=lambda x: x.work_id)
0718 assert works == [work2, work6]
0719 work4.status = WorkStatus.New
0720 work5.status = WorkStatus.New
0721 work1.status = WorkStatus.New
0722
0723 return workflow