File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test workflow condtions.
0014 """
0015
0016
0017 import logging
0018
0019
0020 from idds.common.utils import setup_logging, get_logger
0021
0022 from idds.common.utils import json_dumps, json_loads
0023
0024 from idds.common.dict_class import DictClass
0025 from idds.workflowv2.work import Work, WorkStatus
0026 from idds.workflowv2.workflow import (
0027 Condition,
0028 Workflow,
0029 )
0030
0031
0032 setup_logging(__name__)
0033
0034 logger = logging.getLogger("main")
0035
0036
0037 def test_workflow_condition():
0038 work1 = Work(executable="/bin/hostname1", arguments=None, sandbox=None, work_id=1)
0039 work2 = Work(executable="/bin/hostname2", arguments=None, sandbox=None, work_id=2)
0040
0041 workflow1 = Workflow()
0042 workflow1.add_work(work1, initial=False)
0043 workflow1.add_work(work2, initial=False)
0044
0045 workflow1.test = True
0046 cond = Condition(cond=work1.is_finished, true_work=work2)
0047 workflow1.add_condition(cond)
0048
0049 workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
0050
0051 workflow1 = json_loads(workflow_str)
0052
0053 workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)
0054
0055
0056
0057
0058
0059 def test_workflow_subloopworkflow_reload():
0060 work1 = Work(executable="/bin/hostname1", arguments=None, sandbox=None, work_id=1)
0061 work2 = Work(executable="/bin/hostname2", arguments=None, sandbox=None, work_id=2)
0062
0063 workflow1 = Workflow()
0064 workflow1.add_work(work1, initial=False)
0065 workflow1.add_work(work2, initial=False)
0066
0067 cond = Condition(cond=work2.is_finished)
0068 workflow1.add_loop_condition(cond)
0069
0070 work3 = Work(executable="/bin/hostname3", arguments=None, sandbox=None, work_id=3)
0071 cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
0072
0073 workflow = Workflow()
0074 workflow.add_work(work3, initial=False)
0075 workflow.add_work(workflow1, initial=False)
0076 workflow.add_condition(cond1)
0077
0078
0079
0080 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0081
0082 workflow = json_loads(workflow_str)
0083
0084
0085
0086 logger.info("1")
0087 works = workflow.get_new_works()
0088 works.sort(key=lambda x: x.work_id)
0089 assert works == [work3]
0090
0091
0092 for work in works:
0093
0094 work.transforming = True
0095 work.submitted = True
0096 work.status = WorkStatus.Finished
0097
0098
0099
0100
0101 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0102
0103 workflow = json_loads(workflow_str)
0104
0105 logger.info("2")
0106 works = workflow.get_new_works()
0107 works.sort(key=lambda x: x.work_id)
0108 print(works)
0109 assert works == [work1, work2]
0110 assert workflow.is_terminated() is False
0111
0112 for work in works:
0113 work.transforming = True
0114 work.submitted = True
0115 work.status = WorkStatus.Finished
0116
0117
0118 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0119
0120 workflow = json_loads(workflow_str)
0121
0122 logger.info("3")
0123 works = workflow.get_new_works()
0124 works.sort(key=lambda x: x.work_id)
0125
0126
0127 print(works)
0128 assert works == [work1, work2]
0129 assert workflow.is_terminated() is False
0130
0131 for work in works:
0132 work.transforming = True
0133 work.submitted = True
0134 work.status = WorkStatus.Failed
0135
0136
0137 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0138
0139 workflow = json_loads(workflow_str)
0140
0141 logger.info("4")
0142 works = workflow.get_new_works()
0143 works.sort(key=lambda x: x.work_id)
0144 assert works == []
0145 assert workflow.is_terminated() is True
0146
0147
0148 def test_workflow_subloopworkflow_reload1():
0149 work1 = Work(executable="/bin/hostname1", arguments=None, sandbox=None, work_id=1)
0150 work2 = Work(executable="/bin/hostname2", arguments=None, sandbox=None, work_id=2)
0151
0152 workflow1 = Workflow()
0153 workflow1.add_work(work1, initial=False)
0154 workflow1.add_work(work2, initial=False)
0155
0156 cond = Condition(cond=work2.is_finished)
0157 workflow1.add_loop_condition(cond)
0158
0159 work3 = Work(executable="/bin/hostname3", arguments=None, sandbox=None, work_id=3)
0160 cond1 = Condition(cond=workflow1.is_terminated, true_work=work3)
0161
0162 workflow = Workflow()
0163 workflow.add_work(work3, initial=False)
0164 workflow.add_work(workflow1, initial=False)
0165 workflow.add_condition(cond1)
0166
0167
0168
0169 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0170
0171 workflow = json_loads(workflow_str)
0172
0173
0174
0175 logger.info("1")
0176 works = workflow.get_new_works()
0177 works.sort(key=lambda x: x.work_id)
0178 assert works == [work1, work2]
0179
0180
0181 for work in works:
0182
0183 work.transforming = True
0184 work.submitted = True
0185 work.status = WorkStatus.Finished
0186
0187
0188
0189
0190 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0191
0192 workflow = json_loads(workflow_str)
0193
0194 logger.info("2")
0195 works = workflow.get_new_works()
0196 works.sort(key=lambda x: x.work_id)
0197 print(works)
0198 assert works == [work1, work2]
0199 assert workflow.is_terminated() is False
0200
0201 for work in works:
0202 work.transforming = True
0203 work.submitted = True
0204 work.status = WorkStatus.Failed
0205
0206
0207 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0208
0209 workflow = json_loads(workflow_str)
0210
0211 logger.info("3")
0212 works = workflow.get_new_works()
0213 works.sort(key=lambda x: x.work_id)
0214
0215
0216 print(works)
0217 assert works == [work3]
0218 assert workflow.is_terminated() is False
0219
0220 for work in works:
0221 work.transforming = True
0222 work.submitted = True
0223 work.status = WorkStatus.Finished
0224
0225
0226 workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0227
0228 workflow = json_loads(workflow_str)
0229
0230 logger.info("4")
0231 works = workflow.get_new_works()
0232 works.sort(key=lambda x: x.work_id)
0233 assert works == []
0234 assert workflow.is_terminated() is True
0235
0236
0237 if __name__ == "__main__":
0238 print("==================================================")
0239 print("test_workflow_condition")
0240 test_workflow_condition()
0241
0242 print("==================================================")
0243 print("test_workflow_subloopworkflow_reload")
0244 test_workflow_subloopworkflow_reload()
0245
0246 print("==================================================")
0247 print("test_workflow_subloopworkflow_reload1")
0248 test_workflow_subloopworkflow_reload1()