File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test workflow.
0014 """
0015
0016 import unittest2 as unittest
0017
0018
0019 from idds.common.utils import setup_logging
0020
0021 from idds.client.client import Client
0022 from idds.common.constants import RequestType, RequestStatus
0023 from idds.common.utils import get_rest_host
0024
0025 from idds.workflowv2.work import Work
0026 from idds.workflowv2.workflow import Condition, Workflow
0027
0028
0029 setup_logging(__name__)
0030
0031
0032 class TestWorkflow(unittest.TestCase):
0033
0034 def init(self):
0035
0036 work1 = Work(
0037 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0038 )
0039 work2 = Work(
0040 executable="echo",
0041 arguments="--in=IN_DATASET --out=OUT_DATASET",
0042 sandbox=None,
0043 work_id=2,
0044 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0045 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0046 )
0047 work3 = Work(
0048 executable="echo",
0049 arguments="--in=IN_DATASET --out=OUT_DATASET",
0050 sandbox=None,
0051 work_id=3,
0052 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0053 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0054 )
0055
0056 workflow = Workflow()
0057 workflow.add_work(work1, initial=True)
0058 workflow.add_work(work2, initial=True)
0059 workflow.add_work(work3, initial=False)
0060
0061 cond = Condition(cond=work2.is_finished, true_work=work3)
0062 workflow.add_condition(cond)
0063
0064 return workflow
0065
0066 def test_workflow(self):
0067 """Workflow: Test workflow"""
0068
0069 work1 = Work(
0070 executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0071 )
0072 work2 = Work(
0073 executable="echo",
0074 arguments="--in=IN_DATASET --out=OUT_DATASET",
0075 sandbox=None,
0076 work_id=2,
0077 primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0078 output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0079 )
0080 work3 = Work(
0081 executable="echo",
0082 arguments="--in=IN_DATASET --out=OUT_DATASET",
0083 sandbox=None,
0084 work_id=3,
0085 primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0086 output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0087 )
0088
0089 workflow = Workflow()
0090 workflow.add_work(work1, initial=True)
0091 workflow.add_work(work2, initial=True)
0092 workflow.add_work(work3, initial=False)
0093
0094 cond = Condition(cond=work2.is_finished, true_work=work3)
0095
0096 workflow.add_condition(cond)
0097
0098
0099
0100
0101
0102
0103 works = workflow.get_current_works()
0104
0105
0106 assert works == []
0107
0108 def test_workflow_request(self):
0109 workflow = self.init()
0110
0111 props = {
0112 "scope": "workflow",
0113 "name": workflow.name,
0114 "requester": "panda",
0115 "request_type": RequestType.Workflow,
0116 "transform_tag": "workflow",
0117 "status": RequestStatus.New,
0118 "priority": 0,
0119 "lifetime": 30,
0120 "request_metadata": {"workload_id": "20776840", "workflow": workflow},
0121 }
0122
0123
0124 host = get_rest_host()
0125 client = Client(host=host)
0126 request_id = client.add_request(**props)
0127 print(request_id)