File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test client.
0014 """
0015
0016 import sys
0017 import string
0018 import random
0019 import time
0020
0021
0022
0023
0024
0025
0026
0027 from idds.client.clientmanager import ClientManager
0028
0029 from idds.common.utils import get_rest_host
0030
0031
0032
0033
0034
0035 from idds.workflowv2.workflow import Workflow
0036
0037 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0038
0039
0040 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0041 site = 'in2p3'
0042 task_cloud = 'EU'
0043
0044 task_queue = 'CC-IN2P3_Rubin'
0045 task_queue1 = 'CC-IN2P3_Rubin_Medium'
0046 task_queue2 = 'CC-IN2P3_Rubin_Himem'
0047 task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0048 task_queue4 = 'CC-IN2P3_Rubin_Merge'
0049 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0050 site = 'lancs'
0051 task_cloud = 'EU'
0052
0053 task_queue = 'LANCS_Rubin'
0054 task_queue1 = 'LANCS_Rubin_Medium'
0055 task_queue2 = 'LANCS_Rubin_Himem'
0056 task_queue3 = 'LANCS_Rubin_Extra_Himem'
0057 task_queue3 = 'LANCS_Rubin_Himem'
0058 task_queue4 = 'LANCS_Rubin_Merge'
0059 else:
0060 site = 'slac'
0061
0062 task_cloud = 'US'
0063
0064 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0065
0066
0067
0068 task_queue = 'SLAC_Rubin'
0069 task_queue1 = 'SLAC_Rubin_Medium'
0070 task_queue2 = 'SLAC_Rubin_Himem'
0071 task_queue3 = 'SLAC_Rubin_Extra_Himem'
0072 task_queue4 = 'SLAC_Rubin_Merge'
0073
0074
0075
0076
0077
0078 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0079 return ''.join(random.choice(chars) for _ in range(N))
0080
0081
0082 class PanDATask(object):
0083 name = None
0084 step = None
0085 dependencies = []
0086
0087
0088 def setup_workflow():
0089
0090 taskN1 = PanDATask()
0091 taskN1.step = "step1"
0092 taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0093 taskN1.dependencies = [
0094 {"name": "00000" + str(k),
0095 "dependencies": [],
0096 "submitted": False} for k in range(6)
0097 ]
0098
0099 taskN2 = PanDATask()
0100 taskN2.step = "step2"
0101 taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0102 taskN2.dependencies = [
0103 {
0104 "name": "000010",
0105 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0106 {"task": taskN1.name, "inputname": "000002", "available": False}],
0107 "submitted": False
0108 },
0109 {
0110 "name": "000011",
0111 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0112 {"task": taskN1.name, "inputname": "000002", "available": False}],
0113 "submitted": False
0114 },
0115 {
0116 "name": "000012",
0117 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0118 {"task": taskN1.name, "inputname": "000002", "available": False}],
0119 "submitted": False
0120 }
0121 ]
0122
0123 taskN3 = PanDATask()
0124 taskN3.step = "step3"
0125 taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0126 taskN3.dependencies = [
0127 {
0128 "name": "000020",
0129 "dependencies": [],
0130 "submitted": False
0131 },
0132 {
0133 "name": "000021",
0134 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0135 {"task": taskN2.name, "inputname": "000011", "available": False}],
0136 "submitted": False
0137 },
0138 {
0139 "name": "000022",
0140 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0141 {"task": taskN2.name, "inputname": "000012", "available": False}],
0142 "submitted": False
0143 },
0144 {
0145 "name": "000023",
0146 "dependencies": [],
0147 "submitted": False
0148 },
0149 {
0150 "name": "000024",
0151 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0152 {"task": taskN3.name, "inputname": "000023", "available": False}],
0153 "submitted": False
0154 },
0155 ]
0156
0157 taskN4 = PanDATask()
0158 taskN4.step = "step4"
0159 taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0160 taskN4.dependencies = [
0161 {"name": "00004" * 1000 + str(k),
0162 "dependencies": [],
0163 "submitted": False} for k in range(6)
0164 ]
0165
0166 taskN5 = PanDATask()
0167 taskN5.step = "step5"
0168 taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0169 taskN5.dependencies = [
0170 {"name": "00005" + str(k),
0171 "dependencies": [],
0172 "submitted": False} for k in range(6)
0173 ]
0174
0175 work1 = DomaPanDAWork(executable='echo',
0176 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0177 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0178 log_collections=[], dependency_map=taskN1.dependencies,
0179 task_name=taskN1.name, task_queue=task_queue,
0180 encode_command_line=True,
0181 task_priority=981,
0182 prodSourceLabel='managed',
0183 task_log={"dataset": "PandaJob_#{pandaid}/",
0184 "destination": "local",
0185 "param_type": "log",
0186 "token": "local",
0187 "type": "template",
0188 "value": "log.tgz"},
0189 task_cloud=task_cloud)
0190 work2 = DomaPanDAWork(executable='echo',
0191 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0192 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0193 log_collections=[], dependency_map=taskN2.dependencies,
0194 task_name=taskN2.name, task_queue=task_queue1,
0195 encode_command_line=True,
0196 task_priority=881,
0197 prodSourceLabel='managed',
0198 task_log={"dataset": "PandaJob_#{pandaid}/",
0199 "destination": "local",
0200 "param_type": "log",
0201 "token": "local",
0202 "type": "template",
0203 "value": "log.tgz"},
0204 task_cloud=task_cloud)
0205 work3 = DomaPanDAWork(executable='echo',
0206 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0207 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0208 log_collections=[], dependency_map=taskN3.dependencies,
0209 task_name=taskN3.name, task_queue=task_queue2,
0210 encode_command_line=True,
0211 task_priority=781,
0212 prodSourceLabel='managed',
0213 task_log={"dataset": "PandaJob_#{pandaid}/",
0214 "destination": "local",
0215 "param_type": "log",
0216 "token": "local",
0217 "type": "template",
0218 "value": "log.tgz"},
0219 task_cloud=task_cloud)
0220
0221 work4 = DomaPanDAWork(executable='echo',
0222 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0223 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0224 log_collections=[], dependency_map=taskN4.dependencies,
0225 task_name=taskN4.name, task_queue=task_queue3,
0226 encode_command_line=True,
0227 task_priority=981,
0228 prodSourceLabel='managed',
0229 task_log={"dataset": "PandaJob_#{pandaid}/",
0230 "destination": "local",
0231 "param_type": "log",
0232 "token": "local",
0233 "type": "template",
0234 "value": "log.tgz"},
0235 task_cloud=task_cloud)
0236
0237 work5 = DomaPanDAWork(executable='echo',
0238 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0239 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0240 log_collections=[], dependency_map=taskN5.dependencies,
0241 task_name=taskN5.name, task_queue=task_queue4,
0242 encode_command_line=True,
0243 task_priority=981,
0244 prodSourceLabel='managed',
0245 task_log={"dataset": "PandaJob_#{pandaid}/",
0246 "destination": "local",
0247 "param_type": "log",
0248 "token": "local",
0249 "type": "template",
0250 "value": "log.tgz"},
0251 task_cloud=task_cloud)
0252
0253 pending_time = 12
0254
0255 workflow = Workflow(pending_time=pending_time)
0256 workflow.add_work(work1)
0257 workflow.add_work(work2)
0258 workflow.add_work(work3)
0259 workflow.add_work(work4)
0260 workflow.add_work(work5)
0261 workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0262 return workflow
0263
0264
0265 if __name__ == '__main__':
0266 host = get_rest_host()
0267 workflow = setup_workflow()
0268
0269 wm = ClientManager(host=host)
0270
0271 request_id = wm.submit(workflow, use_dataset_name=False)
0272 print(request_id)