Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023
0010 
0011 
0012 """
0013 Test workflow condtions.
0014 """
0015 
0016 # import json
0017 import logging
0018 
0019 # from nose.tools import assert_equal
0020 from idds.common.utils import setup_logging, get_logger  # noqa F401
0021 
0022 from idds.common.utils import json_dumps, json_loads  # noqa F401
0023 
0024 from idds.common.dict_class import DictClass  # noqa F401
0025 from idds.workflowv2.work import Work, WorkStatus
0026 from idds.workflowv2.workflow import (
0027     Condition,
0028     Workflow,
0029 )  # noqa F401
0030 
0031 
0032 setup_logging(__name__)
0033 
0034 logger = logging.getLogger("main")
0035 
0036 
0037 def test_workflow_condition():
0038     work1 = Work(executable="/bin/hostname1", arguments=None, sandbox=None, work_id=1)
0039     work2 = Work(executable="/bin/hostname2", arguments=None, sandbox=None, work_id=2)
0040 
0041     workflow1 = Workflow()
0042     workflow1.add_work(work1, initial=False)
0043     workflow1.add_work(work2, initial=False)
0044 
0045     workflow1.test = True
0046     cond = Condition(cond=work1.is_finished, true_work=work2)
0047     workflow1.add_condition(cond)
0048 
0049     workflow_str = json_dumps(workflow1, sort_keys=True, indent=4)
0050     # print(workflow_str)
0051     workflow1 = json_loads(workflow_str)
0052 
0053     workflow_str1 = json_dumps(workflow1, sort_keys=True, indent=4)  # noqa F841
0054     # print(workflow_str1)
0055 
0056     # assert(sorted(json.loads(workflow_str).items()) == sorted(json.loads(workflow_str1).items()))
0057 
0058 
0059 def test_workflow_subloopworkflow_reload():
0060     work1 = Work(executable="/bin/hostname1", arguments=None, sandbox=None, work_id=1)
0061     work2 = Work(executable="/bin/hostname2", arguments=None, sandbox=None, work_id=2)
0062 
0063     workflow1 = Workflow()
0064     workflow1.add_work(work1, initial=False)
0065     workflow1.add_work(work2, initial=False)
0066 
0067     cond = Condition(cond=work2.is_finished)
0068     workflow1.add_loop_condition(cond)
0069 
0070     work3 = Work(executable="/bin/hostname3", arguments=None, sandbox=None, work_id=3)
0071     cond1 = Condition(cond=work3.is_finished, true_work=workflow1)
0072 
0073     workflow = Workflow()
0074     workflow.add_work(work3, initial=False)
0075     workflow.add_work(workflow1, initial=False)
0076     workflow.add_condition(cond1)
0077 
0078     # reload
0079     # print(workflow.conditions)
0080     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0081     # print(workflow_str)
0082     workflow = json_loads(workflow_str)
0083     # print(workflow)
0084     # print(json_dumps(workflow, sort_keys=True, indent=4))
0085 
0086     logger.info("1")
0087     works = workflow.get_new_works()
0088     works.sort(key=lambda x: x.work_id)
0089     assert works == [work3]
0090     # assert(workflow.num_run == 1)
0091 
0092     for work in works:
0093         # if work.work_id == 3:
0094         work.transforming = True
0095         work.submitted = True
0096         work.status = WorkStatus.Finished
0097         # print(work)
0098         # print(work.get_internal_id())
0099 
0100     # reload
0101     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0102     # print(workflow_str)
0103     workflow = json_loads(workflow_str)
0104 
0105     logger.info("2")
0106     works = workflow.get_new_works()
0107     works.sort(key=lambda x: x.work_id)
0108     print(works)
0109     assert works == [work1, work2]
0110     assert workflow.is_terminated() is False
0111 
0112     for work in works:
0113         work.transforming = True
0114         work.submitted = True
0115         work.status = WorkStatus.Finished
0116 
0117     # reload
0118     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0119     # print(workflow_str)
0120     workflow = json_loads(workflow_str)
0121 
0122     logger.info("3")
0123     works = workflow.get_new_works()
0124     works.sort(key=lambda x: x.work_id)
0125     # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0126     # print(workflow_str)
0127     print(works)
0128     assert works == [work1, work2]
0129     assert workflow.is_terminated() is False
0130 
0131     for work in works:
0132         work.transforming = True
0133         work.submitted = True
0134         work.status = WorkStatus.Failed
0135 
0136     # reload
0137     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0138     # print(workflow_str)
0139     workflow = json_loads(workflow_str)
0140 
0141     logger.info("4")
0142     works = workflow.get_new_works()
0143     works.sort(key=lambda x: x.work_id)
0144     assert works == []
0145     assert workflow.is_terminated() is True
0146 
0147 
0148 def test_workflow_subloopworkflow_reload1():
0149     work1 = Work(executable="/bin/hostname1", arguments=None, sandbox=None, work_id=1)
0150     work2 = Work(executable="/bin/hostname2", arguments=None, sandbox=None, work_id=2)
0151 
0152     workflow1 = Workflow()
0153     workflow1.add_work(work1, initial=False)
0154     workflow1.add_work(work2, initial=False)
0155 
0156     cond = Condition(cond=work2.is_finished)
0157     workflow1.add_loop_condition(cond)
0158 
0159     work3 = Work(executable="/bin/hostname3", arguments=None, sandbox=None, work_id=3)
0160     cond1 = Condition(cond=workflow1.is_terminated, true_work=work3)
0161 
0162     workflow = Workflow()
0163     workflow.add_work(work3, initial=False)
0164     workflow.add_work(workflow1, initial=False)
0165     workflow.add_condition(cond1)
0166 
0167     # reload
0168     # print(workflow.conditions)
0169     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0170     # print(workflow_str)
0171     workflow = json_loads(workflow_str)
0172     # print(workflow)
0173     # print(json_dumps(workflow, sort_keys=True, indent=4))
0174 
0175     logger.info("1")
0176     works = workflow.get_new_works()
0177     works.sort(key=lambda x: x.work_id)
0178     assert works == [work1, work2]
0179     # assert(workflow.num_run == 1)
0180 
0181     for work in works:
0182         # if work.work_id == 3:
0183         work.transforming = True
0184         work.submitted = True
0185         work.status = WorkStatus.Finished
0186         # print(work)
0187         # print(work.get_internal_id())
0188 
0189     # reload
0190     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0191     # print(workflow_str)
0192     workflow = json_loads(workflow_str)
0193 
0194     logger.info("2")
0195     works = workflow.get_new_works()
0196     works.sort(key=lambda x: x.work_id)
0197     print(works)
0198     assert works == [work1, work2]
0199     assert workflow.is_terminated() is False
0200 
0201     for work in works:
0202         work.transforming = True
0203         work.submitted = True
0204         work.status = WorkStatus.Failed
0205 
0206     # reload
0207     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0208     # print(workflow_str)
0209     workflow = json_loads(workflow_str)
0210 
0211     logger.info("3")
0212     works = workflow.get_new_works()
0213     works.sort(key=lambda x: x.work_id)
0214     # workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0215     # print(workflow_str)
0216     print(works)
0217     assert works == [work3]
0218     assert workflow.is_terminated() is False
0219 
0220     for work in works:
0221         work.transforming = True
0222         work.submitted = True
0223         work.status = WorkStatus.Finished
0224 
0225     # reload
0226     workflow_str = json_dumps(workflow, sort_keys=True, indent=4)
0227     # print(workflow_str)
0228     workflow = json_loads(workflow_str)
0229 
0230     logger.info("4")
0231     works = workflow.get_new_works()
0232     works.sort(key=lambda x: x.work_id)
0233     assert works == []
0234     assert workflow.is_terminated() is True
0235 
0236 
0237 if __name__ == "__main__":
0238     print("==================================================")
0239     print("test_workflow_condition")
0240     test_workflow_condition()
0241 
0242     print("==================================================")
0243     print("test_workflow_subloopworkflow_reload")
0244     test_workflow_subloopworkflow_reload()
0245 
0246     print("==================================================")
0247     print("test_workflow_subloopworkflow_reload1")
0248     test_workflow_subloopworkflow_reload1()