File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test workflow condtions.
0014 """
0015
0016 import unittest2 as unittest
0017
0018
0019 from idds.common.utils import setup_logging
0020
0021 from idds.common.utils import json_dumps, json_loads
0022
0023 from idds.workflowv2.work import Work, WorkStatus
0024 from idds.workflowv2.workflow import (
0025 CompositeCondition,
0026 AndCondition,
0027 OrCondition,
0028 Condition,
0029 ConditionTrigger,
0030 Workflow,
0031 ParameterLink,
0032 )
0033
0034
0035 setup_logging(__name__)
0036
0037
0038 class TestWorkflowCondtion(unittest.TestCase):
0039
0040 def test_work_custom_condition(self):
0041
0042 work1 = Work(
0043 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0044 )
0045 work1.add_custom_condition("to_exit", True)
0046 assert work1.get_custom_condition_status() is False
0047 work1.to_exit = False
0048 assert work1.get_custom_condition_status() is False
0049 work1.to_exit = "False"
0050 assert work1.get_custom_condition_status() is False
0051 work1.to_exit = True
0052 assert work1.get_custom_condition_status() is True
0053
0054 work1 = Work(
0055 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0056 )
0057 work1.add_custom_condition("to_exit", "true")
0058 assert work1.get_custom_condition_status() is False
0059 work1.to_exit = False
0060 assert work1.get_custom_condition_status() is False
0061 work1.to_exit = "False"
0062 assert work1.get_custom_condition_status() is False
0063 work1.to_exit = True
0064 assert work1.get_custom_condition_status() is False
0065 work1.to_exit = "true"
0066 assert work1.get_custom_condition_status() is True
0067
0068 work1 = Work(
0069 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0070 )
0071 work1.add_custom_condition("to_exit1", True)
0072 work1.add_custom_condition("to_exit2", True)
0073 assert work1.get_custom_condition_status() is False
0074 work1.to_exit1 = False
0075 assert work1.get_custom_condition_status() is False
0076 work1.to_exit1 = "False"
0077 assert work1.get_custom_condition_status() is False
0078 work1.to_exit1 = True
0079 assert work1.get_custom_condition_status() is False
0080 work1.to_exit2 = "true"
0081 assert work1.get_custom_condition_status() is False
0082 work1.to_exit2 = True
0083 assert work1.get_custom_condition_status() is True
0084
0085 work1 = Work(
0086 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0087 )
0088 work1.add_custom_condition("to_exit1", True, op="or")
0089 work1.add_custom_condition("to_exit2", True, op="or")
0090 assert work1.get_custom_condition_status() is False
0091 work1.to_exit1 = False
0092 assert work1.get_custom_condition_status() is False
0093 work1.to_exit1 = "False"
0094 assert work1.get_custom_condition_status() is False
0095 work1.to_exit1 = True
0096 assert work1.get_custom_condition_status() is True
0097 work1.to_exit1 = False
0098 work1.to_exit2 = "true"
0099 assert work1.get_custom_condition_status() is False
0100 work1.to_exit2 = True
0101 assert work1.get_custom_condition_status() is True
0102
0103 work1 = Work(
0104 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0105 )
0106
0107 work1.add_custom_condition("to_exit", True, op="and")
0108 work1.add_custom_condition("to_exit1", True, op="or")
0109 work1.add_custom_condition("to_exit2", True, op="or")
0110 assert work1.get_custom_condition_status() is False
0111 work1.to_exit1 = False
0112 assert work1.get_custom_condition_status() is False
0113 work1.to_exit1 = "False"
0114 assert work1.get_custom_condition_status() is False
0115 work1.to_exit1 = True
0116 assert work1.get_custom_condition_status() is True
0117 work1.to_exit1 = False
0118 work1.to_exit2 = "true"
0119 assert work1.get_custom_condition_status() is False
0120 work1.to_exit2 = True
0121 assert work1.get_custom_condition_status() is True
0122 work1.to_exit1 = False
0123 work1.to_exit2 = False
0124 work1.to_exit = True
0125 assert work1.get_custom_condition_status() is True
0126 assert work1.get_not_custom_condition_status() is False
0127
0128 def test_condition(self):
0129
0130 work1 = Work(
0131 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0132 )
0133 work2 = Work(
0134 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0135 )
0136 work3 = Work(
0137 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0138 )
0139 work4 = Work(
0140 executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0141 )
0142 work5 = Work(
0143 executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0144 )
0145 work6 = Work(
0146 executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0147 )
0148 work7 = Work(
0149 executable="echo",
0150 arguments="--in=IN_DATASET --out=OUT_DATASET",
0151 sandbox=None,
0152 work_id=7,
0153 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0154 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0155 )
0156 work8 = Work(
0157 executable="echo",
0158 arguments="--in=IN_DATASET --out=OUT_DATASET",
0159 sandbox=None,
0160 work_id=8,
0161 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0162 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0163 )
0164
0165 workflow = Workflow()
0166 workflow.add_work(work1, initial=True)
0167 workflow.add_work(work2, initial=True)
0168 workflow.add_work(work3, initial=False)
0169 workflow.add_work(work8, initial=False)
0170
0171
0172 cond1 = CompositeCondition(
0173 conditions=work1.is_finished, true_works=work2, false_works=work3
0174 )
0175 works = cond1.all_works()
0176 assert works == [work1, work2, work3]
0177 works = cond1.all_pre_works()
0178 assert works == [work1]
0179 works = cond1.all_next_works()
0180 assert works == [work2, work3]
0181 cond_status = cond1.get_condition_status()
0182 assert cond_status is False
0183
0184 work1.status = WorkStatus.Finished
0185 cond_status = cond1.get_condition_status()
0186 assert cond_status is True
0187 work1.status = WorkStatus.New
0188
0189 works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0190 assert works == [work3]
0191 work1.status = WorkStatus.Finished
0192 works = cond1.get_next_works(trigger=ConditionTrigger.NotTriggered)
0193 assert works == [work2]
0194 work1.status = WorkStatus.New
0195
0196 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0197 assert works == [work3]
0198 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0199 assert works == []
0200 work1.status = WorkStatus.Finished
0201 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0202 assert works == [work2]
0203 works = cond1.get_next_works(trigger=ConditionTrigger.ToTrigger)
0204 assert works == []
0205 work1.status = WorkStatus.New
0206
0207 works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0208 assert works == [work3]
0209 work1.status = WorkStatus.Finished
0210 works = cond1.get_next_works(trigger=ConditionTrigger.Triggered)
0211 assert works == [work2]
0212 work1.status = WorkStatus.New
0213
0214
0215 cond2 = CompositeCondition(
0216 conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0217 true_works=[work4, work5],
0218 false_works=[work6, work7],
0219 )
0220
0221 works = cond2.all_works()
0222 assert works == [work1, work2, work3, work4, work5, work6, work7]
0223 works = cond2.all_pre_works()
0224 assert works == [work1, work2, work3]
0225 works = cond2.all_next_works()
0226 assert works == [work4, work5, work6, work7]
0227 cond_status = cond2.get_condition_status()
0228 assert cond_status is False
0229
0230 work1.status = WorkStatus.Finished
0231 cond_status = cond2.get_condition_status()
0232 assert cond_status is False
0233 work2.status = WorkStatus.Finished
0234 cond_status = cond2.get_condition_status()
0235 assert cond_status is False
0236 work3.status = WorkStatus.Finished
0237 cond_status = cond2.get_condition_status()
0238 assert cond_status is True
0239 work1.status = WorkStatus.New
0240 work2.status = WorkStatus.New
0241 work3.status = WorkStatus.New
0242
0243 works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0244 assert works == [work6, work7]
0245 work1.status = WorkStatus.Finished
0246 work2.status = WorkStatus.Finished
0247 work3.status = WorkStatus.Finished
0248 works = cond2.get_next_works(trigger=ConditionTrigger.NotTriggered)
0249 assert works == [work4, work5]
0250 work1.status = WorkStatus.New
0251 work2.status = WorkStatus.New
0252 work3.status = WorkStatus.New
0253
0254 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0255 assert works == [work6, work7]
0256 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0257 assert works == []
0258 work1.status = WorkStatus.Finished
0259 work2.status = WorkStatus.Finished
0260 work3.status = WorkStatus.Finished
0261 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0262 assert works == [work4, work5]
0263 works = cond2.get_next_works(trigger=ConditionTrigger.ToTrigger)
0264 assert works == []
0265 work1.status = WorkStatus.New
0266 work2.status = WorkStatus.New
0267 work3.status = WorkStatus.New
0268
0269 works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0270 assert works == [work6, work7]
0271 work1.status = WorkStatus.Finished
0272 work2.status = WorkStatus.Finished
0273 work3.status = WorkStatus.Finished
0274 works = cond2.get_next_works(trigger=ConditionTrigger.Triggered)
0275 assert works == [work4, work5]
0276 work1.status = WorkStatus.New
0277 work2.status = WorkStatus.New
0278 work3.status = WorkStatus.New
0279
0280
0281 cond3 = AndCondition(
0282 conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0283 true_works=[work4, work5],
0284 false_works=[work6, work7],
0285 )
0286
0287 works = cond3.all_works()
0288 assert works == [work1, work2, work3, work4, work5, work6, work7]
0289 works = cond3.all_pre_works()
0290 assert works == [work1, work2, work3]
0291 works = cond3.all_next_works()
0292 assert works == [work4, work5, work6, work7]
0293 cond_status = cond3.get_condition_status()
0294 assert cond_status is False
0295
0296 work1.status = WorkStatus.Finished
0297 cond_status = cond3.get_condition_status()
0298 assert cond_status is False
0299 work2.status = WorkStatus.Finished
0300 cond_status = cond3.get_condition_status()
0301 assert cond_status is False
0302 work3.status = WorkStatus.Finished
0303 cond_status = cond3.get_condition_status()
0304 assert cond_status is True
0305 work1.status = WorkStatus.New
0306 work2.status = WorkStatus.New
0307 work3.status = WorkStatus.New
0308
0309 works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0310 assert works == [work6, work7]
0311 work1.status = WorkStatus.Finished
0312 work2.status = WorkStatus.Finished
0313 work3.status = WorkStatus.Finished
0314 works = cond3.get_next_works(trigger=ConditionTrigger.NotTriggered)
0315 assert works == [work4, work5]
0316 work1.status = WorkStatus.New
0317 work2.status = WorkStatus.New
0318 work3.status = WorkStatus.New
0319
0320 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0321 assert works == [work6, work7]
0322 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0323 assert works == []
0324 work1.status = WorkStatus.Finished
0325 work2.status = WorkStatus.Finished
0326 work3.status = WorkStatus.Finished
0327 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0328 assert works == [work4, work5]
0329 works = cond3.get_next_works(trigger=ConditionTrigger.ToTrigger)
0330 assert works == []
0331 work1.status = WorkStatus.New
0332 work2.status = WorkStatus.New
0333 work3.status = WorkStatus.New
0334
0335 works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0336 assert works == [work6, work7]
0337 work1.status = WorkStatus.Finished
0338 work2.status = WorkStatus.Finished
0339 work3.status = WorkStatus.Finished
0340 works = cond3.get_next_works(trigger=ConditionTrigger.Triggered)
0341 assert works == [work4, work5]
0342 work1.status = WorkStatus.New
0343 work2.status = WorkStatus.New
0344 work3.status = WorkStatus.New
0345
0346
0347 cond4 = OrCondition(
0348 conditions=[work1.is_finished, work2.is_finished, work3.is_finished],
0349 true_works=[work4, work5],
0350 false_works=[work6, work7],
0351 )
0352
0353 works = cond4.all_works()
0354 assert works == [work1, work2, work3, work4, work5, work6, work7]
0355 works = cond4.all_pre_works()
0356 assert works == [work1, work2, work3]
0357 works = cond4.all_next_works()
0358 assert works == [work4, work5, work6, work7]
0359 cond_status = cond4.get_condition_status()
0360 assert cond_status is False
0361
0362 work1.status = WorkStatus.Finished
0363 cond_status = cond4.get_condition_status()
0364 assert cond_status is True
0365 work1.status = WorkStatus.New
0366 work2.status = WorkStatus.Finished
0367 cond_status = cond4.get_condition_status()
0368 assert cond_status is True
0369 work2.status = WorkStatus.New
0370 work3.status = WorkStatus.Finished
0371 cond_status = cond4.get_condition_status()
0372 assert cond_status is True
0373 work1.status = WorkStatus.New
0374 work2.status = WorkStatus.New
0375 work3.status = WorkStatus.New
0376
0377 works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0378 assert works == [work6, work7]
0379 work1.status = WorkStatus.Finished
0380
0381
0382 works = cond4.get_next_works(trigger=ConditionTrigger.NotTriggered)
0383 assert works == [work4, work5]
0384 work1.status = WorkStatus.New
0385 work2.status = WorkStatus.New
0386 work3.status = WorkStatus.New
0387
0388 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0389 assert works == [work6, work7]
0390 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0391 assert works == []
0392 work1.status = WorkStatus.Finished
0393
0394
0395 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0396 assert works == [work4, work5]
0397 works = cond4.get_next_works(trigger=ConditionTrigger.ToTrigger)
0398 assert works == []
0399 work1.status = WorkStatus.New
0400 work2.status = WorkStatus.New
0401 work3.status = WorkStatus.New
0402
0403 works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0404 assert works == [work6, work7]
0405 work1.status = WorkStatus.Finished
0406
0407
0408 works = cond4.get_next_works(trigger=ConditionTrigger.Triggered)
0409 assert works == [work4, work5]
0410 work1.status = WorkStatus.New
0411 work2.status = WorkStatus.New
0412 work3.status = WorkStatus.New
0413
0414
0415 cond5 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0416
0417 works = cond5.all_works()
0418 assert works == [work1, work2, work3]
0419 works = cond5.all_pre_works()
0420 assert works == [work1]
0421 works = cond5.all_next_works()
0422 assert works == [work2, work3]
0423 cond_status = cond5.get_condition_status()
0424 assert cond_status is False
0425
0426 work1.status = WorkStatus.Finished
0427 cond_status = cond5.get_condition_status()
0428 assert cond_status is True
0429 work1.status = WorkStatus.New
0430
0431 works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0432 assert works == [work3]
0433 work1.status = WorkStatus.Finished
0434 works = cond5.get_next_works(trigger=ConditionTrigger.NotTriggered)
0435 assert works == [work2]
0436 work1.status = WorkStatus.New
0437
0438 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0439 assert works == [work3]
0440 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0441 assert works == []
0442 work1.status = WorkStatus.Finished
0443 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0444 assert works == [work2]
0445 works = cond5.get_next_works(trigger=ConditionTrigger.ToTrigger)
0446 assert works == []
0447 work1.status = WorkStatus.New
0448
0449 works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0450 assert works == [work3]
0451 work1.status = WorkStatus.Finished
0452 works = cond5.get_next_works(trigger=ConditionTrigger.Triggered)
0453 assert works == [work2]
0454 work1.status = WorkStatus.New
0455
0456
0457 cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0458 cond7 = CompositeCondition(
0459 conditions=[work4.is_finished, work5.is_finished],
0460 true_works=[work6, cond6],
0461 false_works=work7,
0462 )
0463
0464 works = cond7.all_works()
0465 works.sort(key=lambda x: x.work_id)
0466 assert works == [work1, work2, work3, work4, work5, work6, work7]
0467 works = cond7.all_pre_works()
0468 works.sort(key=lambda x: x.work_id)
0469 assert works == [work1, work4, work5]
0470 works = cond7.all_next_works()
0471 works.sort(key=lambda x: x.work_id)
0472
0473 assert works == [work2, work3, work6, work7]
0474 cond_status = cond7.get_condition_status()
0475 assert cond_status is False
0476
0477 work4.status = WorkStatus.Finished
0478 cond_status = cond7.get_condition_status()
0479 assert cond_status is False
0480 work5.status = WorkStatus.Finished
0481 cond_status = cond7.get_condition_status()
0482 assert cond_status is True
0483 work4.status = WorkStatus.New
0484 work5.status = WorkStatus.New
0485
0486 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0487 assert works == [work7]
0488 work4.status = WorkStatus.Finished
0489 work5.status = WorkStatus.Finished
0490 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0491 works.sort(key=lambda x: x.work_id)
0492 assert works == [work3, work6]
0493 work1.status = WorkStatus.Finished
0494 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0495 works.sort(key=lambda x: x.work_id)
0496 assert works == [work2, work6]
0497 work4.status = WorkStatus.New
0498 work5.status = WorkStatus.New
0499 work1.status = WorkStatus.New
0500
0501 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0502 assert works == [work7]
0503 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0504 assert works == []
0505 work4.status = WorkStatus.Finished
0506 work5.status = WorkStatus.Finished
0507 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0508 works.sort(key=lambda x: x.work_id)
0509 assert works == [work3, work6]
0510 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0511 assert works == []
0512 work1.status = WorkStatus.Finished
0513 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0514 works.sort(key=lambda x: x.work_id)
0515 assert works == [work2]
0516 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0517 works.sort(key=lambda x: x.work_id)
0518 assert works == []
0519 work4.status = WorkStatus.New
0520 work5.status = WorkStatus.New
0521 work1.status = WorkStatus.New
0522
0523 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0524 assert works == [work7]
0525 work4.status = WorkStatus.Finished
0526 work5.status = WorkStatus.Finished
0527 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0528 works.sort(key=lambda x: x.work_id)
0529 assert works == [work3, work6]
0530 work1.status = WorkStatus.Finished
0531 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0532 works.sort(key=lambda x: x.work_id)
0533 assert works == [work2, work6]
0534 work4.status = WorkStatus.New
0535 work5.status = WorkStatus.New
0536 work1.status = WorkStatus.New
0537
0538
0539
0540 cond8 = Condition(cond=work1.is_finished)
0541 cond9 = CompositeCondition(
0542 conditions=[work4.is_finished, cond8.is_condition_true],
0543 true_works=[work6],
0544 false_works=work7,
0545 )
0546
0547 works = cond9.all_works()
0548 works.sort(key=lambda x: x.work_id)
0549 assert works == [work1, work4, work6, work7]
0550 works = cond9.all_pre_works()
0551 works.sort(key=lambda x: x.work_id)
0552 assert works == [work1, work4]
0553 works = cond9.all_next_works()
0554 works.sort(key=lambda x: x.work_id)
0555
0556 assert works == [work6, work7]
0557 cond_status = cond9.get_condition_status()
0558 assert cond_status is False
0559
0560 work4.status = WorkStatus.Finished
0561 cond_status = cond9.get_condition_status()
0562 assert cond_status is False
0563 work1.status = WorkStatus.Finished
0564 cond_status = cond9.get_condition_status()
0565 assert cond_status is True
0566 work4.status = WorkStatus.New
0567 work1.status = WorkStatus.New
0568
0569 works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0570 assert works == [work7]
0571 work4.status = WorkStatus.Finished
0572 work1.status = WorkStatus.Finished
0573 works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0574 works.sort(key=lambda x: x.work_id)
0575 assert works == [work6]
0576 work1.status = WorkStatus.Finished
0577 works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0578 works.sort(key=lambda x: x.work_id)
0579 assert works == [work6]
0580 work4.status = WorkStatus.New
0581 work1.status = WorkStatus.New
0582
0583 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0584 assert works == [work7]
0585 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0586 assert works == []
0587 work4.status = WorkStatus.Finished
0588 work1.status = WorkStatus.Finished
0589 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0590 works.sort(key=lambda x: x.work_id)
0591 assert works == [work6]
0592 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0593 assert works == []
0594 work1.status = WorkStatus.Finished
0595 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0596 works.sort(key=lambda x: x.work_id)
0597 assert works == []
0598 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0599 works.sort(key=lambda x: x.work_id)
0600 assert works == []
0601 work4.status = WorkStatus.New
0602 work1.status = WorkStatus.New
0603
0604 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0605 assert works == [work7]
0606 work4.status = WorkStatus.Finished
0607 work1.status = WorkStatus.Finished
0608 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0609 works.sort(key=lambda x: x.work_id)
0610 assert works == [work6]
0611 work1.status = WorkStatus.Finished
0612 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0613 works.sort(key=lambda x: x.work_id)
0614 assert works == [work6]
0615 work4.status = WorkStatus.New
0616 work1.status = WorkStatus.New
0617
0618 return workflow
0619
0620 def print_workflow(self, workflow):
0621 print("print workflow")
0622 print(workflow.conditions)
0623 for cond_id in workflow.conditions:
0624 print(cond_id)
0625 cond = workflow.conditions[cond_id]
0626 print(cond)
0627 print(cond.conditions)
0628 print(cond.true_works)
0629 print(cond.false_works)
0630 for w in cond.true_works:
0631 print(w)
0632 if isinstance(w, CompositeCondition):
0633 print(w.conditions)
0634 print(w.true_works)
0635 print(w.false_works)
0636
0637 def test_workflow(self):
0638 work1 = Work(
0639 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0640 )
0641 work2 = Work(
0642 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0643 )
0644 work3 = Work(
0645 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0646 )
0647 work4 = Work(
0648 executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0649 )
0650 work5 = Work(
0651 executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0652 )
0653 work6 = Work(
0654 executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0655 )
0656 work7 = Work(
0657 executable="echo",
0658 arguments="--in=IN_DATASET --out=OUT_DATASET",
0659 sandbox=None,
0660 work_id=7,
0661 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0662 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0663 )
0664 work8 = Work(
0665 executable="echo",
0666 arguments="--in=IN_DATASET --out=OUT_DATASET",
0667 sandbox=None,
0668 work_id=8,
0669 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0670 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0671 )
0672
0673 workflow = Workflow()
0674 workflow.add_work(work1, initial=False)
0675 workflow.add_work(work2, initial=False)
0676 workflow.add_work(work3, initial=False)
0677 workflow.add_work(work4, initial=False)
0678 workflow.add_work(work5, initial=False)
0679 workflow.add_work(work6, initial=False)
0680 workflow.add_work(work7, initial=False)
0681 workflow.add_work(work8, initial=False)
0682
0683
0684 cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0685 cond7 = CompositeCondition(
0686 conditions=[work4.is_finished, work5.is_finished],
0687 true_works=[work6, cond6],
0688 false_works=work7,
0689 )
0690
0691
0692
0693 cond8 = Condition(cond=work1.is_finished)
0694 cond9 = CompositeCondition(
0695 conditions=[work4.is_finished, cond8.is_condition_true],
0696 true_works=[work6],
0697 false_works=work7,
0698 )
0699
0700 workflow.add_condition(cond7)
0701 workflow.add_condition(cond9)
0702 id_works = workflow.independent_works
0703
0704 id_works.sort()
0705 id_works_1 = [work1, work4, work5, work8]
0706 id_works_1 = [w.get_template_id() for w in id_works_1]
0707 id_works_1.sort()
0708
0709 assert id_works == id_works_1
0710
0711 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0712
0713 workflow1 = json_loads(workflow_str)
0714
0715
0716 workflow1.load_metadata()
0717
0718
0719 workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0720 assert workflow_str == workflow_str1
0721
0722 works = cond7.all_works()
0723 works.sort(key=lambda x: x.work_id)
0724 assert works == [work1, work2, work3, work4, work5, work6, work7]
0725 works = cond7.all_pre_works()
0726 works.sort(key=lambda x: x.work_id)
0727 assert works == [work1, work4, work5]
0728 works = cond7.all_next_works()
0729 works.sort(key=lambda x: x.work_id)
0730
0731 assert works == [work2, work3, work6, work7]
0732 cond_status = cond7.get_condition_status()
0733 assert cond_status is False
0734
0735 work4.status = WorkStatus.Finished
0736 cond_status = cond7.get_condition_status()
0737 assert cond_status is False
0738 work5.status = WorkStatus.Finished
0739 cond_status = cond7.get_condition_status()
0740 assert cond_status is True
0741 work4.status = WorkStatus.New
0742 work5.status = WorkStatus.New
0743
0744 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0745 assert works == [work7]
0746 work4.status = WorkStatus.Finished
0747 work5.status = WorkStatus.Finished
0748 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0749 works.sort(key=lambda x: x.work_id)
0750 assert works == [work3, work6]
0751 work1.status = WorkStatus.Finished
0752 works = cond7.get_next_works(trigger=ConditionTrigger.NotTriggered)
0753 works.sort(key=lambda x: x.work_id)
0754 assert works == [work2, work6]
0755 work4.status = WorkStatus.New
0756 work5.status = WorkStatus.New
0757 work1.status = WorkStatus.New
0758
0759 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0760 assert works == [work7]
0761 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0762 assert works == []
0763 work4.status = WorkStatus.Finished
0764 work5.status = WorkStatus.Finished
0765 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0766 works.sort(key=lambda x: x.work_id)
0767 assert works == [work3, work6]
0768 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0769 assert works == []
0770 work1.status = WorkStatus.Finished
0771 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0772 works.sort(key=lambda x: x.work_id)
0773 assert works == [work2]
0774 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0775 works.sort(key=lambda x: x.work_id)
0776 assert works == []
0777 work4.status = WorkStatus.New
0778 work5.status = WorkStatus.New
0779 work1.status = WorkStatus.New
0780
0781 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0782 assert works == [work7]
0783 work4.status = WorkStatus.Finished
0784 work5.status = WorkStatus.Finished
0785 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0786 works.sort(key=lambda x: x.work_id)
0787 assert works == [work3, work6]
0788 work1.status = WorkStatus.Finished
0789 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0790 works.sort(key=lambda x: x.work_id)
0791 assert works == [work2, work6]
0792 work4.status = WorkStatus.New
0793 work5.status = WorkStatus.New
0794 work1.status = WorkStatus.New
0795
0796
0797
0798
0799
0800
0801 works = cond9.all_works()
0802 works.sort(key=lambda x: x.work_id)
0803 assert works == [work1, work4, work6, work7]
0804 works = cond9.all_pre_works()
0805 works.sort(key=lambda x: x.work_id)
0806 assert works == [work1, work4]
0807 works = cond9.all_next_works()
0808 works.sort(key=lambda x: x.work_id)
0809
0810 assert works == [work6, work7]
0811 cond_status = cond9.get_condition_status()
0812 assert cond_status is False
0813
0814 work4.status = WorkStatus.Finished
0815 cond_status = cond9.get_condition_status()
0816 assert cond_status is False
0817 work1.status = WorkStatus.Finished
0818 cond_status = cond9.get_condition_status()
0819 assert cond_status is True
0820 work4.status = WorkStatus.New
0821 work1.status = WorkStatus.New
0822
0823 works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0824 assert works == [work7]
0825 work4.status = WorkStatus.Finished
0826 work1.status = WorkStatus.Finished
0827 works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0828 works.sort(key=lambda x: x.work_id)
0829 assert works == [work6]
0830 work1.status = WorkStatus.Finished
0831 works = cond9.get_next_works(trigger=ConditionTrigger.NotTriggered)
0832 works.sort(key=lambda x: x.work_id)
0833 assert works == [work6]
0834 work4.status = WorkStatus.New
0835 work1.status = WorkStatus.New
0836
0837 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0838 assert works == [work7]
0839 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0840 assert works == []
0841 work4.status = WorkStatus.Finished
0842 work1.status = WorkStatus.Finished
0843 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0844 works.sort(key=lambda x: x.work_id)
0845 assert works == [work6]
0846 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0847 assert works == []
0848 work1.status = WorkStatus.Finished
0849 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0850 works.sort(key=lambda x: x.work_id)
0851 assert works == []
0852 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0853 works.sort(key=lambda x: x.work_id)
0854 assert works == []
0855 work4.status = WorkStatus.New
0856 work1.status = WorkStatus.New
0857
0858 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0859 assert works == [work7]
0860 work4.status = WorkStatus.Finished
0861 work1.status = WorkStatus.Finished
0862 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0863 works.sort(key=lambda x: x.work_id)
0864 assert works == [work6]
0865 work1.status = WorkStatus.Finished
0866 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
0867 works.sort(key=lambda x: x.work_id)
0868 assert works == [work6]
0869 work4.status = WorkStatus.New
0870 work1.status = WorkStatus.New
0871
0872 return workflow
0873
0874 def test_workflow_condition_reload(self):
0875 work1 = Work(
0876 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0877 )
0878 work2 = Work(
0879 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
0880 )
0881 work3 = Work(
0882 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
0883 )
0884 work4 = Work(
0885 executable="/bin/hostname", arguments=None, sandbox=None, work_id=4
0886 )
0887 work5 = Work(
0888 executable="/bin/hostname", arguments=None, sandbox=None, work_id=5
0889 )
0890 work6 = Work(
0891 executable="/bin/hostname", arguments=None, sandbox=None, work_id=6
0892 )
0893 work7 = Work(
0894 executable="echo",
0895 arguments="--in=IN_DATASET --out=OUT_DATASET",
0896 sandbox=None,
0897 work_id=7,
0898 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0899 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0900 )
0901 work8 = Work(
0902 executable="echo",
0903 arguments="--in=IN_DATASET --out=OUT_DATASET",
0904 sandbox=None,
0905 work_id=8,
0906 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0907 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0908 )
0909
0910 workflow = Workflow()
0911 workflow.add_work(work1, initial=False)
0912 workflow.add_work(work2, initial=False)
0913 workflow.add_work(work3, initial=False)
0914 workflow.add_work(work4, initial=False)
0915 workflow.add_work(work5, initial=False)
0916 workflow.add_work(work6, initial=False)
0917 workflow.add_work(work7, initial=False)
0918 workflow.add_work(work8, initial=False)
0919
0920
0921 cond6 = Condition(cond=work1.is_finished, true_work=work2, false_work=work3)
0922 cond7 = CompositeCondition(
0923 conditions=[work4.is_finished, work5.is_finished],
0924 true_works=[work6, cond6],
0925 false_works=work7,
0926 )
0927
0928
0929
0930 cond8 = Condition(cond=work1.is_finished)
0931 cond9 = CompositeCondition(
0932 conditions=[work4.is_finished, cond8.is_condition_true],
0933 true_works=[work6],
0934 false_works=work7,
0935 )
0936
0937 workflow.add_condition(cond7)
0938 workflow.add_condition(cond9)
0939
0940 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0941
0942 workflow1 = json_loads(workflow_str)
0943
0944
0945 workflow1.load_metadata()
0946
0947
0948 workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0949 assert workflow_str == workflow_str1
0950
0951 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0952 assert works == [work7]
0953 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0954 assert works == []
0955 work4.status = WorkStatus.Finished
0956 work5.status = WorkStatus.Finished
0957 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0958 works.sort(key=lambda x: x.work_id)
0959 assert works == [work3, work6]
0960 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0961 assert works == []
0962 work1.status = WorkStatus.Finished
0963 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0964 works.sort(key=lambda x: x.work_id)
0965 assert works == [work2]
0966 works = cond7.get_next_works(trigger=ConditionTrigger.ToTrigger)
0967 works.sort(key=lambda x: x.work_id)
0968 assert works == []
0969 work4.status = WorkStatus.New
0970 work5.status = WorkStatus.New
0971 work1.status = WorkStatus.New
0972
0973 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0974 assert works == [work7]
0975 work4.status = WorkStatus.Finished
0976 work5.status = WorkStatus.Finished
0977 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0978 works.sort(key=lambda x: x.work_id)
0979 assert works == [work3, work6]
0980 work1.status = WorkStatus.Finished
0981 works = cond7.get_next_works(trigger=ConditionTrigger.Triggered)
0982 works.sort(key=lambda x: x.work_id)
0983 assert works == [work2, work6]
0984 work4.status = WorkStatus.New
0985 work5.status = WorkStatus.New
0986 work1.status = WorkStatus.New
0987
0988
0989 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0990 assert works == [work7]
0991 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0992 assert works == []
0993 work4.status = WorkStatus.Finished
0994 work1.status = WorkStatus.Finished
0995 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0996 works.sort(key=lambda x: x.work_id)
0997 assert works == [work6]
0998 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
0999 assert works == []
1000 work1.status = WorkStatus.Finished
1001 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
1002 works.sort(key=lambda x: x.work_id)
1003 assert works == []
1004 works = cond9.get_next_works(trigger=ConditionTrigger.ToTrigger)
1005 works.sort(key=lambda x: x.work_id)
1006 assert works == []
1007 work4.status = WorkStatus.New
1008 work1.status = WorkStatus.New
1009
1010 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
1011 assert works == [work7]
1012 work4.status = WorkStatus.Finished
1013 work1.status = WorkStatus.Finished
1014 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
1015 works.sort(key=lambda x: x.work_id)
1016 assert works == [work6]
1017 work1.status = WorkStatus.Finished
1018 works = cond9.get_next_works(trigger=ConditionTrigger.Triggered)
1019 works.sort(key=lambda x: x.work_id)
1020 assert works == [work6]
1021 work4.status = WorkStatus.New
1022 work1.status = WorkStatus.New
1023
1024 return workflow
1025
1026 def test_workflow_loop(self):
1027 work1 = Work(
1028 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1029 )
1030 work2 = Work(
1031 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1032 )
1033
1034 workflow = Workflow()
1035 workflow.add_work(work1, initial=False)
1036 workflow.add_work(work2, initial=False)
1037
1038 cond = Condition(cond=work2.is_finished)
1039 workflow.add_loop_condition(cond)
1040
1041 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1042
1043 workflow1 = json_loads(workflow_str)
1044
1045
1046 workflow1.load_metadata()
1047
1048
1049 workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
1050 assert workflow_str == workflow_str1
1051
1052 def test_workflow_loop1(self):
1053 work1 = Work(
1054 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1055 )
1056 work2 = Work(
1057 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1058 )
1059
1060 workflow = Workflow()
1061 workflow.add_work(work1, initial=False)
1062 workflow.add_work(work2, initial=False)
1063
1064 cond = Condition(cond=work2.is_finished)
1065 workflow.add_loop_condition(cond)
1066
1067 works = workflow.get_new_works()
1068 works.sort(key=lambda x: x.work_id)
1069 assert works == [work1, work2]
1070 assert workflow.num_run == 1
1071
1072
1073
1074 return workflow
1075
1076 def test_workflow_loop2(self):
1077 work1 = Work(
1078 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1079 )
1080 work2 = Work(
1081 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1082 )
1083
1084 workflow = Workflow()
1085 workflow.add_work(work1, initial=False)
1086 workflow.add_work(work2, initial=False)
1087
1088 cond = Condition(cond=work2.is_finished)
1089 workflow.add_loop_condition(cond)
1090
1091 works = workflow.get_new_works()
1092 works.sort(key=lambda x: x.work_id)
1093 assert works == [work1, work2]
1094 assert workflow.num_run == 1
1095
1096 for work in works:
1097 work.transforming = True
1098 work.status = WorkStatus.Finished
1099 works = workflow.get_new_works()
1100 works.sort(key=lambda x: x.work_id)
1101 assert works == [work1, work2]
1102 assert workflow.num_run == 2
1103
1104
1105
1106 for work in works:
1107 work.transforming = True
1108 work.status = WorkStatus.Finished
1109 works = workflow.get_new_works()
1110 works.sort(key=lambda x: x.work_id)
1111 assert works == [work1, work2]
1112 assert workflow.num_run == 3
1113
1114
1115
1116 for work in works:
1117 work.transforming = True
1118 work.status = WorkStatus.Failed
1119 works = workflow.get_new_works()
1120 works.sort(key=lambda x: x.work_id)
1121 assert works == []
1122 assert workflow.num_run == 3
1123
1124
1125
1126 return workflow
1127
1128 def test_workflow_subworkflow(self):
1129 work1 = Work(
1130 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1131 )
1132 work2 = Work(
1133 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1134 )
1135
1136 workflow1 = Workflow()
1137 workflow1.add_work(work1, initial=False)
1138 workflow1.add_work(work2, initial=False)
1139
1140 work3 = Work(
1141 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1142 )
1143
1144 workflow = Workflow()
1145 workflow.add_work(work3, initial=False)
1146 workflow.add_work(workflow1, initial=False)
1147
1148 works = workflow.get_new_works()
1149
1150
1151 works.sort(key=lambda x: x.work_id)
1152 assert works == [work1, work2, work3]
1153
1154
1155 for work in works:
1156
1157 work.transforming = True
1158 work.status = WorkStatus.Failed
1159 works = workflow.get_new_works()
1160 works.sort(key=lambda x: x.work_id)
1161 assert works == []
1162
1163
1164 assert workflow.is_terminated() is True
1165
1166 def test_workflow_subworkflow1(self):
1167 work1 = Work(
1168 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1169 )
1170 work2 = Work(
1171 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1172 )
1173
1174 workflow1 = Workflow()
1175 workflow1.add_work(work1, initial=False)
1176 workflow1.add_work(work2, initial=False)
1177
1178 work3 = Work(
1179 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1180 )
1181 cond = Condition(cond=work3.is_finished, true_work=workflow1)
1182
1183 workflow = Workflow()
1184 workflow.add_work(work3, initial=False)
1185 workflow.add_work(workflow1, initial=False)
1186 workflow.add_condition(cond)
1187
1188 works = workflow.get_new_works()
1189 works.sort(key=lambda x: x.work_id)
1190 assert works == [work3]
1191
1192
1193 for work in works:
1194
1195 work.transforming = True
1196 work.status = WorkStatus.Failed
1197
1198 works = workflow.get_new_works()
1199 works.sort(key=lambda x: x.work_id)
1200 assert works == []
1201 assert workflow.is_terminated() is True
1202
1203 def test_workflow_subworkflow2(self):
1204 work1 = Work(
1205 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1206 )
1207 work2 = Work(
1208 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1209 )
1210
1211 workflow1 = Workflow()
1212 workflow1.add_work(work1, initial=False)
1213 workflow1.add_work(work2, initial=False)
1214
1215 work3 = Work(
1216 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1217 )
1218 cond = Condition(cond=work3.is_finished, true_work=workflow1)
1219
1220 workflow = Workflow()
1221 workflow.add_work(work3, initial=False)
1222 workflow.add_work(workflow1, initial=False)
1223 workflow.add_condition(cond)
1224
1225 works = workflow.get_new_works()
1226 works.sort(key=lambda x: x.work_id)
1227 assert works == [work3]
1228
1229
1230 for work in works:
1231
1232 work.transforming = True
1233 work.status = WorkStatus.Finished
1234
1235 works = workflow.get_new_works()
1236 works.sort(key=lambda x: x.work_id)
1237 assert works == [work1, work2]
1238 assert workflow.is_terminated() is False
1239
1240 for work in works:
1241 work.transforming = True
1242 work.status = WorkStatus.Finished
1243
1244 works = workflow.get_new_works()
1245 works.sort(key=lambda x: x.work_id)
1246 assert works == []
1247 assert workflow.is_terminated() is True
1248
1249 def test_workflow_subloopworkflow(self):
1250 work1 = Work(
1251 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1252 )
1253 work2 = Work(
1254 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1255 )
1256
1257 workflow1 = Workflow()
1258 workflow1.add_work(work1, initial=False)
1259 workflow1.add_work(work2, initial=False)
1260
1261 cond = Condition(cond=work2.is_finished)
1262 workflow1.add_loop_condition(cond)
1263
1264 work3 = Work(
1265 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1266 )
1267
1268 workflow = Workflow()
1269 workflow.add_work(work3, initial=False)
1270 workflow.add_work(workflow1, initial=False)
1271
1272 works = workflow.get_new_works()
1273 works.sort(key=lambda x: x.work_id)
1274 assert works == [work1, work2, work3]
1275
1276
1277 for work in works:
1278
1279 work.transforming = True
1280 work.status = WorkStatus.Failed
1281 assert workflow.is_terminated() is True
1282
1283 def test_workflow_subloopworkflow1(self):
1284 work1 = Work(
1285 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1286 )
1287 work2 = Work(
1288 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1289 )
1290
1291 workflow1 = Workflow()
1292 workflow1.add_work(work1, initial=False)
1293 workflow1.add_work(work2, initial=False)
1294
1295 cond = Condition(cond=work2.is_finished)
1296 workflow1.add_loop_condition(cond)
1297
1298 work3 = Work(
1299 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1300 )
1301
1302 workflow = Workflow()
1303 workflow.add_work(work3, initial=False)
1304 workflow.add_work(workflow1, initial=False)
1305
1306 works = workflow.get_new_works()
1307 works.sort(key=lambda x: x.work_id)
1308 assert works == [work1, work2, work3]
1309
1310
1311 for work in works:
1312
1313 work.transforming = True
1314 work.status = WorkStatus.Finished
1315 assert workflow.is_terminated() is False
1316
1317 works = workflow.get_new_works()
1318 works.sort(key=lambda x: x.work_id)
1319 assert works == [work1, work2]
1320
1321
1322 for work in works:
1323
1324 work.transforming = True
1325 work.status = WorkStatus.Failed
1326 works = workflow.get_new_works()
1327 works.sort(key=lambda x: x.work_id)
1328 assert works == []
1329
1330
1331 assert workflow.is_terminated() is True
1332
1333 def test_workflow_subloopworkflow2(self):
1334 work1 = Work(
1335 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1336 )
1337 work2 = Work(
1338 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1339 )
1340
1341 workflow1 = Workflow()
1342 workflow1.add_work(work1, initial=False)
1343 workflow1.add_work(work2, initial=False)
1344
1345 cond = Condition(cond=work2.is_finished)
1346 workflow1.add_loop_condition(cond)
1347
1348 work3 = Work(
1349 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1350 )
1351 cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1352
1353 workflow = Workflow()
1354 workflow.add_work(work3, initial=False)
1355 workflow.add_work(workflow1, initial=False)
1356 workflow.add_condition(cond1)
1357
1358 works = workflow.get_new_works()
1359 works.sort(key=lambda x: x.work_id)
1360 assert works == [work3]
1361
1362
1363 for work in works:
1364
1365 work.transforming = True
1366 work.status = WorkStatus.Failed
1367
1368 works = workflow.get_new_works()
1369 works.sort(key=lambda x: x.work_id)
1370 assert works == []
1371 assert workflow.is_terminated() is True
1372
1373 def test_workflow_subloopworkflow3(self):
1374 work1 = Work(
1375 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1376 )
1377 work2 = Work(
1378 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1379 )
1380
1381 workflow1 = Workflow()
1382 workflow1.add_work(work1, initial=False)
1383 workflow1.add_work(work2, initial=False)
1384
1385 cond = Condition(cond=work2.is_finished)
1386 workflow1.add_loop_condition(cond)
1387
1388 work3 = Work(
1389 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1390 )
1391 cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1392
1393 workflow = Workflow()
1394 workflow.add_work(work3, initial=False)
1395 workflow.add_work(workflow1, initial=False)
1396 workflow.add_condition(cond1)
1397
1398 works = workflow.get_new_works()
1399 works.sort(key=lambda x: x.work_id)
1400 assert works == [work3]
1401
1402
1403 for work in works:
1404
1405 work.transforming = True
1406 work.status = WorkStatus.Finished
1407
1408 works = workflow.get_new_works()
1409 works.sort(key=lambda x: x.work_id)
1410 assert works == [work1, work2]
1411 assert workflow.is_terminated() is False
1412
1413 for work in works:
1414 work.transforming = True
1415 work.status = WorkStatus.Finished
1416
1417 works = workflow.get_new_works()
1418 works.sort(key=lambda x: x.work_id)
1419
1420
1421 assert works == [work1, work2]
1422 assert workflow.is_terminated() is False
1423
1424 works = workflow.get_new_works()
1425 works.sort(key=lambda x: x.work_id)
1426
1427
1428 assert works == [work1, work2]
1429 assert workflow.is_terminated() is False
1430
1431 for work in works:
1432 work.transforming = True
1433 work.status = WorkStatus.Failed
1434
1435 works = workflow.get_new_works()
1436 works.sort(key=lambda x: x.work_id)
1437 assert works == []
1438 assert workflow.is_terminated() is True
1439
1440 def test_custom_condition(self):
1441 work1 = Work(
1442 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1443 )
1444 work1.add_custom_condition(key="to_continue", value=True)
1445 assert work1.get_custom_condition_status() is False
1446
1447 work1.output_data = {"to_continue": True}
1448 assert work1.get_custom_condition_status() is True
1449
1450 def test_workflow_subloopworkflow4(self):
1451 work1 = Work(
1452 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1453 )
1454 work2 = Work(
1455 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1456 )
1457
1458 workflow1 = Workflow()
1459 workflow1.add_work(work1, initial=False)
1460 workflow1.add_work(work2, initial=False)
1461
1462 work2.add_custom_condition(key="to_continue", value=True)
1463 cond = Condition(cond=work2.get_custom_condition_status)
1464 workflow1.add_loop_condition(cond)
1465
1466 work3 = Work(
1467 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1468 )
1469 cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1470
1471 workflow = Workflow()
1472 workflow.add_work(work3, initial=False)
1473 workflow.add_work(workflow1, initial=False)
1474 workflow.add_condition(cond1)
1475
1476 works = workflow.get_new_works()
1477 works.sort(key=lambda x: x.work_id)
1478 assert works == [work3]
1479
1480
1481 for work in works:
1482
1483 work.transforming = True
1484 work.status = WorkStatus.Failed
1485
1486 works = workflow.get_new_works()
1487 works.sort(key=lambda x: x.work_id)
1488 assert works == []
1489 assert workflow.is_terminated() is True
1490
1491 def test_workflow_subloopworkflow_reload(self):
1492 work1 = Work(
1493 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
1494 )
1495 work2 = Work(
1496 executable="/bin/hostname", arguments=None, sandbox=None, work_id=2
1497 )
1498
1499 workflow1 = Workflow()
1500 workflow1.add_work(work1, initial=False)
1501 workflow1.add_work(work2, initial=False)
1502
1503 cond = Condition(cond=work2.is_finished)
1504 workflow1.add_loop_condition(cond)
1505
1506 work3 = Work(
1507 executable="/bin/hostname", arguments=None, sandbox=None, work_id=3
1508 )
1509 cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
1510
1511 workflow = Workflow()
1512 workflow.add_work(work3, initial=False)
1513 workflow.add_work(workflow1, initial=False)
1514 workflow.add_condition(cond1)
1515
1516
1517 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1518
1519 workflow = json_loads(workflow_str)
1520
1521 works = workflow.get_new_works()
1522 works.sort(key=lambda x: x.work_id)
1523 assert works == [work3]
1524
1525
1526 for work in works:
1527
1528 work.transforming = True
1529 work.status = WorkStatus.Finished
1530
1531
1532 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1533
1534 workflow = json_loads(workflow_str)
1535
1536 works = workflow.get_new_works()
1537 works.sort(key=lambda x: x.work_id)
1538 assert works == [work1, work2]
1539 assert workflow.is_terminated() is False
1540
1541 for work in works:
1542 work.transforming = True
1543 work.status = WorkStatus.Finished
1544
1545
1546 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1547
1548 workflow = json_loads(workflow_str)
1549
1550 works = workflow.get_new_works()
1551 works.sort(key=lambda x: x.work_id)
1552
1553
1554 assert works == [work1, work2]
1555 assert workflow.is_terminated() is False
1556
1557 for work in works:
1558 work.transforming = True
1559 work.status = WorkStatus.Failed
1560
1561
1562 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
1563
1564 workflow = json_loads(workflow_str)
1565
1566 works = workflow.get_new_works()
1567 works.sort(key=lambda x: x.work_id)
1568 assert works == []
1569 assert workflow.is_terminated() is True
1570
1571 def test_workflow_subloopworkflow_parameter_link(self):
1572 work1 = Work(
1573 executable="/bin/hostname",
1574 arguments=None,
1575 sandbox=None,
1576 work_id=1,
1577 primary_input_collection={
1578 "scope": "test_scop",
1579 "name": "input_test_work_1",
1580 },
1581 primary_output_collection={
1582 "scope": "test_scop",
1583 "name": "output_test_work_1",
1584 },
1585 )
1586 work2 = Work(
1587 executable="/bin/hostname",
1588 arguments=None,
1589 sandbox=None,
1590 work_id=2,
1591 primary_input_collection={
1592 "scope": "test_scop",
1593 "name": "input_test_work_2",
1594 },
1595 primary_output_collection={
1596 "scope": "test_scop",
1597 "name": "output_test_work_2",
1598 },
1599 )
1600
1601 workflow1 = Workflow()
1602 workflow1.add_work(work1, initial=False)
1603 workflow1.add_work(work2, initial=False)
1604
1605 cond1 = Condition(cond=work1.is_finished, true_work=work2)
1606 workflow1.add_condition(cond1)
1607
1608 p_link = ParameterLink(
1609 parameters=[
1610 {
1611 "source": "primary_output_collection",
1612 "destination": "primary_input_collection",
1613 }
1614 ]
1615 )
1616 workflow1.add_parameter_link(work1, work2, p_link)
1617
1618 works = workflow1.get_new_works()
1619 works.sort(key=lambda x: x.work_id)
1620 assert works == [work1]
1621
1622 work1_1 = works[0]
1623 assert work1_1.primary_input_collection.name == "input_test_work_1"
1624 assert work1_1.primary_output_collection.name == "output_test_work_1"
1625
1626 for work in works:
1627
1628 work.transforming = True
1629 work.status = WorkStatus.Finished
1630
1631 works = workflow1.get_new_works()
1632 works.sort(key=lambda x: x.work_id)
1633 assert works == [work2]
1634 assert workflow1.is_terminated() is False
1635 work2_1 = works[0]
1636
1637
1638
1639 assert work2_1.primary_input_collection.name == "output_test_work_1"
1640 assert work2_1.primary_output_collection.name == "output_test_work_2"
1641
1642 for work in works:
1643
1644 work.transforming = True
1645 work.status = WorkStatus.Finished
1646
1647 works = workflow1.get_new_works()
1648 works.sort(key=lambda x: x.work_id)
1649 assert works == []
1650 assert workflow1.is_terminated() is True
1651
1652 def test_workflow_subloopworkflow_parameter_link1(self):
1653 work1 = Work(
1654 executable="/bin/hostname",
1655 arguments=None,
1656 sandbox=None,
1657 work_id=1,
1658 primary_input_collection={
1659 "scope": "test_scop",
1660 "name": "input_test_work_1",
1661 },
1662 primary_output_collection={
1663 "scope": "test_scop",
1664 "name": "output_test_work_1",
1665 },
1666 )
1667 work2 = Work(
1668 executable="/bin/hostname",
1669 arguments=None,
1670 sandbox=None,
1671 work_id=2,
1672 primary_input_collection={
1673 "scope": "test_scop",
1674 "name": "input_test_work_2",
1675 },
1676 primary_output_collection={
1677 "scope": "test_scop",
1678 "name": "output_test_work_2",
1679 },
1680 )
1681
1682 workflow1 = Workflow()
1683 workflow1.add_work(work1, initial=False)
1684 workflow1.add_work(work2, initial=False)
1685
1686 cond1 = Condition(cond=work1.is_finished, true_work=work2)
1687 workflow1.add_condition(cond1)
1688
1689 p_link = ParameterLink(
1690 parameters=[
1691 {
1692 "source": "primary_output_collection",
1693 "destination": "primary_input_collection",
1694 }
1695 ]
1696 )
1697 workflow1.add_parameter_link(work1, work2, p_link)
1698
1699 cond = Condition(cond=work2.is_finished)
1700 workflow1.add_loop_condition(cond)
1701
1702 works = workflow1.get_new_works()
1703 works.sort(key=lambda x: x.work_id)
1704 assert works == [work1]
1705
1706 work1_1 = works[0]
1707 assert work1_1.primary_input_collection.name == "input_test_work_1"
1708 assert work1_1.primary_output_collection.name == "output_test_work_1"
1709
1710 for work in works:
1711
1712 work.transforming = True
1713 work.status = WorkStatus.Finished
1714
1715 works = workflow1.get_new_works()
1716 works.sort(key=lambda x: x.work_id)
1717 assert works == [work2]
1718 assert workflow1.is_terminated() is False
1719 work2_1 = works[0]
1720
1721
1722
1723 assert work2_1.primary_input_collection.name == "output_test_work_1"
1724 assert work2_1.primary_output_collection.name == "output_test_work_2"
1725
1726 for work in works:
1727
1728 work.transforming = True
1729 work.status = WorkStatus.Finished
1730
1731 works = workflow1.get_new_works()
1732 works.sort(key=lambda x: x.work_id)
1733 assert works == [work1]
1734 assert workflow1.is_terminated() is False
1735
1736
1737 work1_2 = works[0]
1738 assert work1_2.primary_input_collection.name == "input_test_work_1"
1739 assert work1_2.primary_output_collection.name == "output_test_work_1.2"
1740
1741 for work in works:
1742
1743 work.transforming = True
1744 work.status = WorkStatus.Finished
1745
1746 works = workflow1.get_new_works()
1747 works.sort(key=lambda x: x.work_id)
1748 assert works == [work2]
1749 assert workflow1.is_terminated() is False
1750 work2_2 = works[0]
1751
1752
1753
1754 assert work2_2.primary_input_collection.name == "output_test_work_1.2"
1755 assert work2_2.primary_output_collection.name == "output_test_work_2.2"
1756
1757 for work in works:
1758
1759 work.transforming = True
1760 work.status = WorkStatus.Failed
1761
1762 works = workflow1.get_new_works()
1763 works.sort(key=lambda x: x.work_id)
1764 assert works == []
1765 assert workflow1.is_terminated() is True
1766
1767 def test_workflow_subloopworkflow_parameter_link2(self):
1768 work1 = Work(
1769 executable="/bin/hostname",
1770 arguments=None,
1771 sandbox=None,
1772 work_id=1,
1773 primary_input_collection={
1774 "scope": "test_scop",
1775 "name": "input_test_work_1",
1776 },
1777 primary_output_collection={
1778 "scope": "test_scop",
1779 "name": "output_test_work_1",
1780 },
1781 )
1782 work2 = Work(
1783 executable="/bin/hostname",
1784 arguments=None,
1785 sandbox=None,
1786 work_id=2,
1787 primary_input_collection={
1788 "scope": "test_scop",
1789 "name": "input_test_work_2",
1790 },
1791 primary_output_collection={
1792 "scope": "test_scop",
1793 "name": "output_test_work_2",
1794 },
1795 )
1796
1797 workflow1 = Workflow()
1798 workflow1.add_work(work1, initial=False)
1799 workflow1.add_work(work2, initial=False)
1800
1801 cond1 = Condition(cond=work1.is_finished, true_work=work2)
1802 workflow1.add_condition(cond1)
1803
1804 p_link = ParameterLink(
1805 parameters=[
1806 {
1807 "source": "primary_output_collection",
1808 "destination": "primary_input_collection",
1809 }
1810 ]
1811 )
1812 workflow1.add_parameter_link(work1, work2, p_link)
1813
1814 cond = Condition(cond=work2.is_finished)
1815 workflow1.add_loop_condition(cond)
1816
1817 work3 = Work(
1818 executable="/bin/hostname",
1819 arguments=None,
1820 sandbox=None,
1821 work_id=3,
1822 primary_input_collection={
1823 "scope": "test_scop",
1824 "name": "input_test_work_3",
1825 },
1826 primary_output_collection={
1827 "scope": "test_scop",
1828 "name": "output_test_work_3",
1829 },
1830 )
1831 cond2 = Condition(cond=work3.is_finished, true_work=workflow1)
1832 p_link1 = ParameterLink(
1833 parameters=[
1834 {
1835 "source": "primary_output_collection",
1836 "destination": "primary_input_collection",
1837 }
1838 ]
1839 )
1840
1841 workflow = Workflow()
1842 workflow.add_work(work3, initial=False)
1843 workflow.add_work(workflow1, initial=False)
1844 workflow.add_condition(cond2)
1845 workflow.add_parameter_link(work3, work1, p_link1)
1846
1847 works = workflow.get_new_works()
1848 works.sort(key=lambda x: x.work_id)
1849 assert works == [work3]
1850
1851 work3_1 = works[0]
1852 assert work3_1.primary_input_collection.name == "input_test_work_3"
1853 assert work3_1.primary_output_collection.name == "output_test_work_3"
1854
1855 for work in works:
1856
1857 work.transforming = True
1858 work.status = WorkStatus.Finished
1859
1860 works = workflow.get_new_works()
1861 works.sort(key=lambda x: x.work_id)
1862 assert works == [work1]
1863 assert workflow.is_terminated() is False
1864 work1_1 = works[0]
1865
1866
1867
1868 assert work1_1.primary_input_collection.name == "output_test_work_3"
1869 assert work1_1.primary_output_collection.name == "output_test_work_1"
1870
1871 for work in works:
1872
1873 work.transforming = True
1874 work.status = WorkStatus.Finished
1875
1876 works = workflow.get_new_works()
1877 works.sort(key=lambda x: x.work_id)
1878 assert works == [work2]
1879 assert workflow1.is_terminated() is False
1880
1881
1882 work2_1 = works[0]
1883 assert work2_1.primary_input_collection.name == "output_test_work_1"
1884 assert work2_1.primary_output_collection.name == "output_test_work_2"
1885
1886 for work in works:
1887
1888 work.transforming = True
1889 work.status = WorkStatus.Finished
1890
1891 works = workflow.get_new_works()
1892 works.sort(key=lambda x: x.work_id)
1893 assert works == [work1]
1894 assert workflow1.is_terminated() is False
1895 work1_2 = works[0]
1896
1897
1898
1899 assert work1_2.primary_input_collection.name == "output_test_work_3"
1900 assert work1_2.primary_output_collection.name == "output_test_work_1.2"
1901
1902 for work in works:
1903
1904 work.transforming = True
1905 work.status = WorkStatus.Finished
1906
1907 works = workflow.get_new_works()
1908 works.sort(key=lambda x: x.work_id)
1909 assert works == [work2]
1910 assert workflow1.is_terminated() is False
1911 work2_2 = works[0]
1912
1913
1914
1915 assert work2_2.primary_input_collection.name == "output_test_work_1.2"
1916 assert work2_2.primary_output_collection.name == "output_test_work_2.2"
1917
1918 for work in works:
1919
1920 work.transforming = True
1921 work.status = WorkStatus.Failed
1922
1923 works = workflow.get_new_works()
1924 works.sort(key=lambda x: x.work_id)
1925 assert works == []
1926 assert workflow.is_terminated() is True