File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 """
0014 Test client.
0015 """
0016
0017 import string
0018 import random
0019 import time
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029 from idds.common.utils import get_rest_host
0030
0031
0032
0033
0034
0035 from idds.workflowv2.workflow import Workflow
0036
0037 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0038
0039 import idds.common.utils as idds_utils
0040 import pandaclient.idds_api
0041
0042
0043
0044
0045
0046 task_cloud = 'US'
0047 task_queue = "SLAC_Rubin"
0048 task_queue = 'BNL_OSG_2'
0049
0050
0051 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0052 return ''.join(random.choice(chars) for _ in range(N))
0053
0054
0055 class PanDATask(object):
0056 name = None
0057 step = None
0058 dependencies = []
0059
0060
0061 def setup_workflow():
0062
0063 taskN1 = PanDATask()
0064 taskN1.step = "step1"
0065 taskN1.name = taskN1.step + "_" + randStr()
0066 taskN1.dependencies = [
0067 {"name": "00000" + str(k),
0068 "dependencies": [],
0069 "submitted": False} for k in range(6)
0070 ]
0071
0072 taskN2 = PanDATask()
0073 taskN2.step = "step2"
0074 taskN2.name = taskN2.step + "_" + randStr()
0075 taskN2.dependencies = [
0076 {
0077 "name": "000010",
0078 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079 {"task": taskN1.name, "inputname": "000002", "available": False}],
0080 "submitted": False
0081 },
0082 {
0083 "name": "000011",
0084 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085 {"task": taskN1.name, "inputname": "000002", "available": False}],
0086 "submitted": False
0087 },
0088 {
0089 "name": "000012",
0090 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0091 {"task": taskN1.name, "inputname": "000002", "available": False}],
0092 "submitted": False
0093 }
0094 ]
0095
0096 taskN3 = PanDATask()
0097 taskN3.step = "step3"
0098 taskN3.name = taskN3.step + "_" + randStr()
0099 taskN3.dependencies = [
0100 {
0101 "name": "000020",
0102 "dependencies": [],
0103 "submitted": False
0104 },
0105 {
0106 "name": "000021",
0107 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0108 {"task": taskN2.name, "inputname": "000011", "available": False}],
0109 "submitted": False
0110 },
0111 {
0112 "name": "000022",
0113 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0114 {"task": taskN2.name, "inputname": "000012", "available": False}],
0115 "submitted": False
0116 },
0117 {
0118 "name": "000023",
0119 "dependencies": [],
0120 "submitted": False
0121 },
0122 {
0123 "name": "000024",
0124 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0125 {"task": taskN3.name, "inputname": "000023", "available": False}],
0126 "submitted": False
0127 },
0128 ]
0129
0130 work1 = DomaPanDAWork(executable='echo',
0131 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0132 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0133 log_collections=[], dependency_map=taskN1.dependencies,
0134 task_name=taskN1.name, task_queue=task_queue,
0135 encode_command_line=True,
0136 task_log={"dataset": "PandaJob_#{pandaid}/",
0137 "destination": "local",
0138 "param_type": "log",
0139 "token": "local",
0140 "type": "template",
0141 "value": "log.tgz"},
0142 task_cloud=task_cloud)
0143 work2 = DomaPanDAWork(executable='echo',
0144 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0145 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0146 log_collections=[], dependency_map=taskN2.dependencies,
0147 task_name=taskN2.name, task_queue=task_queue,
0148 task_log={"dataset": "PandaJob_#{pandaid}/",
0149 "destination": "local",
0150 "param_type": "log",
0151 "token": "local",
0152 "type": "template",
0153 "value": "log.tgz"},
0154 encode_command_line=True,
0155 task_cloud=task_cloud)
0156 work3 = DomaPanDAWork(executable='echo',
0157 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0158 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0159 log_collections=[], dependency_map=taskN3.dependencies,
0160 task_name=taskN3.name, task_queue=task_queue,
0161 task_log={"dataset": "PandaJob_#{pandaid}/",
0162 "destination": "local",
0163 "param_type": "log",
0164 "token": "local",
0165 "type": "template",
0166 "value": "log.tgz"},
0167 task_cloud=task_cloud,
0168 encode_command_line=True)
0169
0170 pending_time = 12
0171
0172 workflow = Workflow(pending_time=pending_time)
0173 workflow.add_work(work1)
0174 workflow.add_work(work2)
0175 workflow.add_work(work3)
0176 workflow.name = 'test_workflow.idds.%s' % time.time()
0177 return workflow
0178
0179
0180 def submit(workflow, idds_server):
0181
0182 idds_server = None
0183 c = pandaclient.idds_api.get_api(idds_utils.json_dumps, verbose=True,
0184 idds_host=idds_server, compress=True, manager=True)
0185 request_id = c.submit(workflow, username=None, use_dataset_name=False)
0186 print("Submitted into iDDs with request id=%s", str(request_id))
0187
0188
0189 if __name__ == '__main__':
0190 host = get_rest_host()
0191 workflow = setup_workflow()
0192
0193
0194
0195
0196 submit(workflow, host)