Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020 - 2021
0010 
0011 
0012 """
0013 Test workflow.
0014 """
0015 
0016 import unittest2 as unittest
0017 
0018 # from nose.tools import assert_equal
0019 from idds.common.utils import setup_logging
0020 
0021 from idds.client.client import Client
0022 from idds.common.constants import RequestType, RequestStatus
0023 from idds.common.utils import get_rest_host
0024 
0025 from idds.workflowv2.work import Work  # Parameter, WorkStatus
0026 from idds.workflowv2.workflow import Condition, Workflow
0027 
0028 
0029 setup_logging(__name__)
0030 
0031 
0032 class TestWorkflow(unittest.TestCase):
0033 
0034     def init(self):
0035         # init_p = Parameter({'input_dataset': 'data17:data17.test.raw.1'})
0036         work1 = Work(
0037             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0038         )
0039         work2 = Work(
0040             executable="echo",
0041             arguments="--in=IN_DATASET --out=OUT_DATASET",
0042             sandbox=None,
0043             work_id=2,
0044             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0045             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0046         )
0047         work3 = Work(
0048             executable="echo",
0049             arguments="--in=IN_DATASET --out=OUT_DATASET",
0050             sandbox=None,
0051             work_id=3,
0052             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0053             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0054         )
0055 
0056         workflow = Workflow()
0057         workflow.add_work(work1, initial=True)
0058         workflow.add_work(work2, initial=True)
0059         workflow.add_work(work3, initial=False)
0060 
0061         cond = Condition(cond=work2.is_finished, true_work=work3)
0062         workflow.add_condition(cond)
0063 
0064         return workflow
0065 
0066     def test_workflow(self):
0067         """Workflow: Test workflow"""
0068         # init_p = Parameter({'input_dataset': 'data17:data17.test.raw.1'})
0069         work1 = Work(
0070             executable="/bin/hostname", arguments=None, sandbox=None, work_id=1
0071         )
0072         work2 = Work(
0073             executable="echo",
0074             arguments="--in=IN_DATASET --out=OUT_DATASET",
0075             sandbox=None,
0076             work_id=2,
0077             primary_input_collection={"scope": "data17", "name": "data17.test.raw.1"},
0078             output_collections=[{"scope": "data17", "name": "data17.test.work2"}],
0079         )
0080         work3 = Work(
0081             executable="echo",
0082             arguments="--in=IN_DATASET --out=OUT_DATASET",
0083             sandbox=None,
0084             work_id=3,
0085             primary_input_collection={"scope": "data17", "name": "data17.test.work2"},
0086             output_collections=[{"scope": "data17", "name": "data17.test.work3"}],
0087         )
0088 
0089         workflow = Workflow()
0090         workflow.add_work(work1, initial=True)
0091         workflow.add_work(work2, initial=True)
0092         workflow.add_work(work3, initial=False)
0093 
0094         cond = Condition(cond=work2.is_finished, true_work=work3)
0095         # print(cond.all_works())
0096         workflow.add_condition(cond)
0097 
0098         # check
0099         # workflow_str = workflow.serialize()
0100         # workflow1 = Workflow.deserialize(workflow_str)
0101         # print(workflow_str)
0102         # print(workflow1)
0103         works = workflow.get_current_works()
0104         # print([str(work) for work in works])
0105         # print([w.work_id for w in works])
0106         assert works == []
0107 
0108     def test_workflow_request(self):
0109         workflow = self.init()
0110 
0111         props = {
0112             "scope": "workflow",
0113             "name": workflow.name,
0114             "requester": "panda",
0115             "request_type": RequestType.Workflow,
0116             "transform_tag": "workflow",
0117             "status": RequestStatus.New,
0118             "priority": 0,
0119             "lifetime": 30,
0120             "request_metadata": {"workload_id": "20776840", "workflow": workflow},
0121         }
0122 
0123         # print(props)
0124         host = get_rest_host()
0125         client = Client(host=host)
0126         request_id = client.add_request(**props)
0127         print(request_id)