File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 """
0014 Test client.
0015 """
0016
0017 import sys
0018 import string
0019 import random
0020 import time
0021
0022
0023
0024
0025
0026
0027
0028 from idds.client.clientmanager import ClientManager
0029
0030 from idds.common.utils import get_rest_host
0031
0032
0033
0034
0035
0036 from idds.workflowv2.workflow import Workflow
0037
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039
0040
0041 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0042 site = 'in2p3'
0043 task_cloud = 'EU'
0044
0045 task_queue = 'CC-IN2P3_Rubin'
0046 task_queue1 = 'CC-IN2P3_Rubin'
0047 task_queue2 = 'CC-IN2P3_Rubin'
0048 task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0049 task_queue4 = 'CC-IN2P3_Rubin_Merge'
0050
0051 task_queue5 = 'CC-IN2P3_Rubin_Extra_Himem'
0052 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0053 site = 'lancs'
0054 task_cloud = 'EU'
0055
0056 task_queue = 'LANCS_Rubin'
0057 task_queue1 = 'LANCS_Rubin'
0058 task_queue2 = 'LANCS_Rubin'
0059 task_queue3 = 'LANCS_Rubin_Extra_Himem'
0060
0061 task_queue4 = 'LANCS_Rubin_Merge'
0062
0063 task_queue5 = 'LANCS_Rubin_Extra_Himem'
0064 elif len(sys.argv) > 1 and sys.argv[1] == "ral":
0065 site = 'RAL'
0066 task_cloud = 'EU'
0067
0068 task_queue = 'RAL_Rubin'
0069 task_queue1 = 'RAL_Rubin'
0070 task_queue2 = 'RAL_Rubin'
0071 task_queue1 = task_queue
0072 task_queue2 = task_queue
0073 task_queue3 = 'RAL_Rubin_Extra_Himem'
0074
0075 task_queue4 = 'RAL_Rubin_Merge'
0076
0077 task_queue5 = 'RAL_Rubin_Extra_Himem'
0078 else:
0079 site = 'slac'
0080
0081 task_cloud = 'US'
0082
0083 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0084
0085
0086
0087 task_queue = 'SLAC_Rubin'
0088 task_queue1 = 'SLAC_Rubin'
0089 task_queue2 = 'SLAC_Rubin'
0090 task_queue3 = 'SLAC_Rubin_Extra_Himem'
0091 task_queue4 = 'SLAC_Rubin_Merge'
0092 task_queue5 = 'SLAC_Rubin'
0093
0094
0095 task_queue2 = 'SLAC_Rubin'
0096
0097
0098
0099
0100
0101 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0102 return ''.join(random.choice(chars) for _ in range(N))
0103
0104
0105 class PanDATask(object):
0106 name = None
0107 step = None
0108 dependencies = []
0109
0110
0111 def setup_workflow():
0112
0113 taskN1 = PanDATask()
0114 taskN1.step = "step1"
0115 taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0116 taskN1.dependencies = [
0117 {"name": "00000" + str(k),
0118 "order_id": k,
0119 "dependencies": [],
0120 "submitted": False} for k in range(6)
0121 ]
0122
0123 taskN2 = PanDATask()
0124 taskN2.step = "step2"
0125 taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0126 taskN2.dependencies = [
0127 {
0128 "name": "000010",
0129 "order_id": 0,
0130 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0131 {"task": taskN1.name, "inputname": "000002", "available": False}],
0132 "submitted": False
0133 },
0134 {
0135 "name": "000011",
0136 "order_id": 1,
0137 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0138 {"task": taskN1.name, "inputname": "000002", "available": False}],
0139 "submitted": False
0140 },
0141 {
0142 "name": "000012",
0143 "order_id": 2,
0144 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0145 {"task": taskN1.name, "inputname": "000002", "available": False}],
0146 "submitted": False
0147 }
0148 ]
0149
0150 taskN3 = PanDATask()
0151 taskN3.step = "step3"
0152 taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0153 taskN3.dependencies = [
0154 {
0155 "name": "000020",
0156 "order_id": 0,
0157 "dependencies": [],
0158 "submitted": False
0159 },
0160 {
0161 "name": "000021",
0162 "order_id": 1,
0163 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0164 {"task": taskN2.name, "inputname": "000011", "available": False}],
0165 "submitted": False
0166 },
0167 {
0168 "name": "000022",
0169 "order_id": 2,
0170 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0171 {"task": taskN2.name, "inputname": "000012", "available": False}],
0172 "submitted": False
0173 },
0174 {
0175 "name": "000023",
0176 "order_id": 3,
0177 "dependencies": [],
0178 "submitted": False
0179 },
0180 {
0181 "name": "000024",
0182 "order_id": 4,
0183 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0184 {"task": taskN3.name, "inputname": "000023", "available": False}],
0185 "submitted": False
0186 },
0187 ]
0188
0189 taskN4 = PanDATask()
0190 taskN4.step = "step4"
0191 taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0192 taskN4.dependencies = [
0193 {"name": "00004" + str(k),
0194 "order_id": k,
0195 "dependencies": [],
0196 "submitted": False} for k in range(6)
0197 ]
0198
0199 taskN5 = PanDATask()
0200 taskN5.step = "step5"
0201 taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0202 taskN5.dependencies = [
0203 {"name": "00005" + str(k),
0204 "order_id": k,
0205 "dependencies": [],
0206 "submitted": False} for k in range(6)
0207 ]
0208
0209 taskN6 = PanDATask()
0210 taskN6.step = "step6"
0211 taskN6.name = site + "_" + taskN6.step + "_" + randStr()
0212 taskN6.dependencies = [
0213 {"name": "00006" + str(k),
0214 "order_id": k,
0215 "dependencies": [],
0216 "submitted": False} for k in range(6)
0217 ]
0218
0219 taskN7 = PanDATask()
0220 taskN7.step = "step7"
0221 taskN7.name = site + "_" + taskN7.step + "_" + randStr()
0222 taskN7.dependencies = [
0223 {"name": "00007" + str(k),
0224 "order_id": k,
0225 "dependencies": [],
0226 "submitted": False} for k in range(6)
0227 ]
0228
0229 work1 = DomaPanDAWork(executable='echo',
0230 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0231 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0232 log_collections=[], dependency_map=taskN1.dependencies,
0233 task_name=taskN1.name, task_queue=task_queue,
0234 encode_command_line=True,
0235 task_priority=981,
0236 prodSourceLabel='managed',
0237 task_log={"dataset": "PandaJob_#{pandaid}/",
0238 "destination": "local",
0239 "param_type": "log",
0240 "token": "local",
0241 "type": "template",
0242 "value": "log.tgz"},
0243 task_cloud=task_cloud)
0244 work2 = DomaPanDAWork(executable='echo',
0245 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0246 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0247 log_collections=[], dependency_map=taskN2.dependencies,
0248 task_name=taskN2.name, task_queue=task_queue1,
0249 encode_command_line=True,
0250 task_priority=881,
0251 prodSourceLabel='managed',
0252 task_log={"dataset": "PandaJob_#{pandaid}/",
0253 "destination": "local",
0254 "param_type": "log",
0255 "token": "local",
0256 "type": "template",
0257 "value": "log.tgz"},
0258 task_cloud=task_cloud)
0259 work3 = DomaPanDAWork(executable='echo',
0260 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0261 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0262 log_collections=[], dependency_map=taskN3.dependencies,
0263 task_name=taskN3.name, task_queue=task_queue2,
0264 encode_command_line=True,
0265 task_priority=781,
0266 prodSourceLabel='managed',
0267 task_log={"dataset": "PandaJob_#{pandaid}/",
0268 "destination": "local",
0269 "param_type": "log",
0270 "token": "local",
0271 "type": "template",
0272 "value": "log.tgz"},
0273 task_cloud=task_cloud)
0274
0275 work4 = DomaPanDAWork(executable='echo',
0276 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0277 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0278 log_collections=[], dependency_map=taskN4.dependencies,
0279 task_name=taskN4.name, task_queue=task_queue3,
0280 encode_command_line=True,
0281 task_priority=981,
0282 core_count=2,
0283 task_rss=32000,
0284 prodSourceLabel='managed',
0285 task_log={"dataset": "PandaJob_#{pandaid}/",
0286 "destination": "local",
0287 "param_type": "log",
0288 "token": "local",
0289 "type": "template",
0290 "value": "log.tgz"},
0291 task_cloud=task_cloud)
0292
0293 work5 = DomaPanDAWork(executable='echo',
0294 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0295 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0296 log_collections=[], dependency_map=taskN5.dependencies,
0297 task_name=taskN5.name, task_queue=task_queue4,
0298 encode_command_line=True,
0299 task_priority=981,
0300 prodSourceLabel='managed',
0301 task_log={"dataset": "PandaJob_#{pandaid}/",
0302 "destination": "local",
0303 "param_type": "log",
0304 "token": "local",
0305 "type": "template",
0306 "value": "log.tgz"},
0307 task_cloud=task_cloud)
0308
0309 work6 = DomaPanDAWork(executable='echo',
0310 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0311 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0312 log_collections=[], dependency_map=taskN6.dependencies,
0313 task_name=taskN6.name, task_queue=task_queue5,
0314 encode_command_line=True,
0315 task_priority=981,
0316 prodSourceLabel='managed',
0317 task_log={"dataset": "PandaJob_#{pandaid}/",
0318 "destination": "local",
0319 "param_type": "log",
0320 "token": "local",
0321 "type": "template",
0322 "value": "log.tgz"},
0323 task_cloud=task_cloud)
0324
0325 work7 = DomaPanDAWork(executable='echo',
0326 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0327 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0328 log_collections=[], dependency_map=taskN7.dependencies,
0329 task_name=taskN7.name, task_queue=task_queue3,
0330 encode_command_line=True,
0331 task_priority=981,
0332 core_count=2,
0333 prodSourceLabel='managed',
0334 task_log={"dataset": "PandaJob_#{pandaid}/",
0335 "destination": "local",
0336 "param_type": "log",
0337 "token": "local",
0338 "type": "template",
0339 "value": "log.tgz"},
0340 task_cloud=task_cloud)
0341
0342 pending_time = 12
0343
0344 workflow = Workflow(pending_time=pending_time)
0345 workflow.add_work(work1)
0346 workflow.add_work(work2)
0347 workflow.add_work(work3)
0348 workflow.add_work(work4)
0349 workflow.add_work(work5)
0350 workflow.add_work(work6)
0351 workflow.add_work(work7)
0352 workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0353 return workflow
0354
0355
0356 if __name__ == '__main__':
0357 host = get_rest_host()
0358 workflow = setup_workflow()
0359
0360 wm = ClientManager(host=host)
0361
0362 request_id = wm.submit(workflow, use_dataset_name=False)
0363 print(request_id)