File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 """
0014 Test client.
0015 """
0016
0017 import logging
0018 import string
0019 import random
0020
0021 logging.basicConfig(level=logging.DEBUG)
0022
0023
0024
0025
0026
0027
0028
0029 from idds.client.clientmanager import ClientManager
0030
0031
0032
0033
0034
0035
0036
0037 from idds.workflowv2.workflow import Workflow, Condition
0038
0039 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0040
0041
0042 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0043
0044
0045 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0046 return ''.join(random.choice(chars) for _ in range(N))
0047
0048
0049 class PanDATask(object):
0050 name = None
0051 step = None
0052 dependencies = []
0053
0054
0055 def setup_workflow():
0056
0057 taskN1 = PanDATask()
0058 taskN1.step = "step1"
0059 taskN1.name = taskN1.step + "_" + randStr()
0060 taskN1.dependencies = [
0061 {"name": "00000" + str(k),
0062 "dependencies": [],
0063 "submitted": False} for k in range(6)
0064 ]
0065
0066 taskN2 = PanDATask()
0067 taskN2.step = "step2"
0068 taskN2.name = taskN2.step + "_" + randStr()
0069 taskN2.dependencies = [
0070 {
0071 "name": "000010",
0072 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0073 {"task": taskN1.name, "inputname": "000002", "available": False}],
0074 "submitted": False
0075 },
0076 {
0077 "name": "000011",
0078 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079 {"task": taskN1.name, "inputname": "000002", "available": False}],
0080 "submitted": False
0081 },
0082 {
0083 "name": "000012",
0084 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085 {"task": taskN1.name, "inputname": "000002", "available": False}],
0086 "submitted": False
0087 }
0088 ]
0089
0090 taskN3 = PanDATask()
0091 taskN3.step = "step3"
0092 taskN3.name = taskN3.step + "_" + randStr()
0093 taskN3.dependencies = [
0094 {
0095 "name": "000020",
0096 "dependencies": [],
0097 "submitted": False
0098 },
0099 {
0100 "name": "000021",
0101 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0102 {"task": taskN2.name, "inputname": "000011", "available": False}],
0103 "submitted": False
0104 },
0105 {
0106 "name": "000022",
0107 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0108 {"task": taskN2.name, "inputname": "000012", "available": False}],
0109 "submitted": False
0110 },
0111 {
0112 "name": "000023",
0113 "dependencies": [],
0114 "submitted": False
0115 },
0116 {
0117 "name": "000024",
0118 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0119 {"task": taskN3.name, "inputname": "000023", "available": False}],
0120 "submitted": False
0121 },
0122 ]
0123
0124 work1 = DomaPanDAWork(executable='echo',
0125 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0126 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0127 log_collections=[], dependency_map=taskN1.dependencies,
0128 task_name=taskN1.name, task_queue=task_queue,
0129 encode_command_line=True,
0130 task_log={"dataset": "PandaJob_#{pandaid}/",
0131 "destination": "local",
0132 "param_type": "log",
0133 "token": "local",
0134 "type": "template",
0135 "value": "log.tgz"},
0136 task_cloud='LSST',
0137 task_priority=None)
0138 work2 = DomaPanDAWork(executable='echo',
0139 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0140 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0141 log_collections=[], dependency_map=taskN2.dependencies,
0142 task_name=taskN2.name, task_queue=task_queue,
0143 encode_command_line=True,
0144 task_log={"dataset": "PandaJob_#{pandaid}/",
0145 "destination": "local",
0146 "param_type": "log",
0147 "token": "local",
0148 "type": "template",
0149 "value": "log.tgz"},
0150 task_cloud='LSST')
0151 work3 = DomaPanDAWork(executable='echo',
0152 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0153 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0154 log_collections=[], dependency_map=taskN3.dependencies,
0155 task_name=taskN3.name, task_queue=task_queue,
0156 encode_command_line=True,
0157 task_log={"dataset": "PandaJob_#{pandaid}/",
0158 "destination": "local",
0159 "param_type": "log",
0160 "token": "local",
0161 "type": "template",
0162 "value": "log.tgz"},
0163 task_cloud='LSST')
0164
0165 cond1 = Condition(cond=work1.is_finished, true_work=work2)
0166 cond2 = Condition(cond=work2.is_finished, true_work=work3)
0167
0168 pending_time = 12
0169
0170 workflow = Workflow(pending_time=pending_time)
0171 workflow.add_work(work1)
0172 workflow.add_work(work2)
0173 workflow.add_work(work3)
0174 workflow.add_condition(cond1)
0175 workflow.add_condition(cond2)
0176 return workflow
0177
0178
0179 if __name__ == '__main__':
0180
0181 workflow = setup_workflow()
0182
0183 wm = ClientManager(host=None)
0184 request_id = wm.submit(workflow)
0185 print(request_id)