File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 """
0014 Test client.
0015 """
0016
0017 import sys
0018 import string
0019 import random
0020 import time
0021
0022
0023
0024
0025
0026
0027
0028 from idds.client.clientmanager import ClientManager
0029
0030 from idds.common.utils import get_rest_host
0031
0032
0033
0034
0035
0036 from idds.workflowv2.workflow import Workflow
0037
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039
0040
0041 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0042 site = 'in2p3'
0043 task_cloud = 'EU'
0044
0045 task_queue = 'CC-IN2P3_Rubin'
0046 task_queue1 = 'CC-IN2P3_Rubin_Medium'
0047 task_queue2 = 'CC-IN2P3_Rubin_Himem'
0048 task_queue3 = 'CC-IN2P3_Rubin_Big_Himem'
0049 task_queue4 = 'CC-IN2P3_Rubin_Extra_Himem'
0050 task_queue5 = 'CC-IN2P3_Rubin_Merge'
0051 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0052 site = 'lancs'
0053 task_cloud = 'EU'
0054
0055 task_queue = 'LANCS_Rubin'
0056 task_queue1 = 'LANCS_Rubin_Medium'
0057 task_queue2 = 'LANCS_Rubin_Himem'
0058 task_queue3 = 'LANCS_Rubin_Big_Himem'
0059 task_queue4 = 'LANCS_Rubin_Extra_Himem'
0060
0061 task_queue5 = 'LANCS_Rubin_Merge'
0062 elif len(sys.argv) > 1 and sys.argv[1] == "ral":
0063 site = 'RAL'
0064 task_cloud = 'EU'
0065
0066 task_queue = 'RAL_Rubin'
0067 task_queue1 = 'RAL_Rubin_Medium'
0068 task_queue2 = 'RAL_Rubin_Himem'
0069 task_queue3 = 'RAL_Rubin_Big_Himem'
0070
0071 task_queue4 = 'RAL_Rubin_Merge'
0072
0073 task_queue5 = 'RAL_Rubin_Extra_Himem'
0074 else:
0075 site = 'slac'
0076
0077 task_cloud = 'US'
0078
0079 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0080
0081
0082
0083 task_queue = 'SLAC_Rubin'
0084 task_queue1 = 'SLAC_Rubin_Medium'
0085 task_queue2 = 'SLAC_Rubin_Himem'
0086 task_queue3 = 'SLAC_Rubin_Big_Himem'
0087 task_queue4 = 'SLAC_Rubin_Extra_Himem'
0088 task_queue5 = 'SLAC_Rubin_Merge'
0089
0090
0091
0092
0093
0094 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0095 return ''.join(random.choice(chars) for _ in range(N))
0096
0097
0098 class PanDATask(object):
0099 name = None
0100 step = None
0101 dependencies = []
0102
0103
0104 def setup_workflow():
0105
0106 taskN1 = PanDATask()
0107 taskN1.step = "step1"
0108 taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0109 taskN1.dependencies = [
0110 {"name": "00000" + str(k),
0111 "dependencies": [],
0112 "submitted": False} for k in range(10000)
0113 ]
0114
0115 taskN2 = PanDATask()
0116 taskN2.step = "step2"
0117 taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0118 taskN2.dependencies = [
0119 {
0120 "name": "000010",
0121 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0122 {"task": taskN1.name, "inputname": "000002", "available": False}],
0123 "submitted": False
0124 },
0125 {
0126 "name": "000011",
0127 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0128 {"task": taskN1.name, "inputname": "000002", "available": False}],
0129 "submitted": False
0130 },
0131 {
0132 "name": "000012",
0133 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0134 {"task": taskN1.name, "inputname": "000002", "available": False}],
0135 "submitted": False
0136 },
0137 ]
0138
0139 taskN2.dependencies += [
0140 {"name": "000013" + str(k),
0141 "dependencies": [],
0142 "submitted": False} for k in range(10000)
0143 ]
0144
0145 taskN3 = PanDATask()
0146 taskN3.step = "step3"
0147 taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0148 taskN3.dependencies = [
0149 {
0150 "name": "000020",
0151 "dependencies": [],
0152 "submitted": False
0153 },
0154 {
0155 "name": "000021",
0156 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0157 {"task": taskN2.name, "inputname": "000011", "available": False}],
0158 "submitted": False
0159 },
0160 {
0161 "name": "000022",
0162 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0163 {"task": taskN2.name, "inputname": "000012", "available": False}],
0164 "submitted": False
0165 },
0166 {
0167 "name": "000023",
0168 "dependencies": [],
0169 "submitted": False
0170 },
0171 {
0172 "name": "000024",
0173 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0174 {"task": taskN3.name, "inputname": "000023", "available": False}],
0175 "submitted": False
0176 },
0177 ]
0178
0179 taskN3.dependencies += [
0180 {"name": "000025" + str(k),
0181 "dependencies": [],
0182 "submitted": False} for k in range(10000)
0183 ]
0184
0185 taskN4 = PanDATask()
0186 taskN4.step = "step4"
0187 taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0188 taskN4.dependencies = [
0189 {"name": "00004" + str(k),
0190 "dependencies": [],
0191 "submitted": False} for k in range(10000)
0192 ]
0193
0194 taskN5 = PanDATask()
0195 taskN5.step = "step5"
0196 taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0197 taskN5.dependencies = [
0198 {"name": "00005" + str(k),
0199 "dependencies": [],
0200 "submitted": False} for k in range(100)
0201 ]
0202
0203 taskN6 = PanDATask()
0204 taskN6.step = "step6"
0205 taskN6.name = site + "_" + taskN5.step + "_" + randStr()
0206 taskN6.dependencies = [
0207 {"name": "00005" + str(k),
0208 "dependencies": [],
0209 "submitted": False} for k in range(100)
0210 ]
0211
0212 work1 = DomaPanDAWork(executable='echo; sleep 180',
0213 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0214 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0215 log_collections=[], dependency_map=taskN1.dependencies,
0216 task_name=taskN1.name, task_queue=task_queue,
0217 encode_command_line=True,
0218 task_priority=981,
0219 prodSourceLabel='managed',
0220 task_log={"dataset": "PandaJob_#{pandaid}/",
0221 "destination": "local",
0222 "param_type": "log",
0223 "token": "local",
0224 "type": "template",
0225 "value": "log.tgz"},
0226 task_cloud=task_cloud)
0227 work2 = DomaPanDAWork(executable='echo; sleep 180',
0228 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0229 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0230 log_collections=[], dependency_map=taskN2.dependencies,
0231 task_name=taskN2.name, task_queue=task_queue1,
0232 encode_command_line=True,
0233 task_priority=881,
0234 prodSourceLabel='managed',
0235 task_log={"dataset": "PandaJob_#{pandaid}/",
0236 "destination": "local",
0237 "param_type": "log",
0238 "token": "local",
0239 "type": "template",
0240 "value": "log.tgz"},
0241 task_cloud=task_cloud)
0242 work3 = DomaPanDAWork(executable='echo; sleep 180',
0243 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0244 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0245 log_collections=[], dependency_map=taskN3.dependencies,
0246 task_name=taskN3.name, task_queue=task_queue2,
0247 encode_command_line=True,
0248 task_priority=781,
0249 prodSourceLabel='managed',
0250 task_log={"dataset": "PandaJob_#{pandaid}/",
0251 "destination": "local",
0252 "param_type": "log",
0253 "token": "local",
0254 "type": "template",
0255 "value": "log.tgz"},
0256 task_cloud=task_cloud)
0257
0258 work4 = DomaPanDAWork(executable='echo; sleep 180',
0259 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0260 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0261 log_collections=[], dependency_map=taskN4.dependencies,
0262 task_name=taskN4.name, task_queue=task_queue3,
0263 encode_command_line=True,
0264 task_priority=981,
0265 prodSourceLabel='managed',
0266 task_log={"dataset": "PandaJob_#{pandaid}/",
0267 "destination": "local",
0268 "param_type": "log",
0269 "token": "local",
0270 "type": "template",
0271 "value": "log.tgz"},
0272 task_cloud=task_cloud)
0273
0274 work5 = DomaPanDAWork(executable='echo; sleep 180',
0275 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0276 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0277 log_collections=[], dependency_map=taskN5.dependencies,
0278 task_name=taskN5.name, task_queue=task_queue4,
0279 encode_command_line=True,
0280 task_priority=981,
0281 prodSourceLabel='managed',
0282 task_log={"dataset": "PandaJob_#{pandaid}/",
0283 "destination": "local",
0284 "param_type": "log",
0285 "token": "local",
0286 "type": "template",
0287 "value": "log.tgz"},
0288 task_cloud=task_cloud)
0289
0290 work6 = DomaPanDAWork(executable='echo; sleep 180',
0291 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0292 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0293 log_collections=[], dependency_map=taskN6.dependencies,
0294 task_name=taskN6.name, task_queue=task_queue5,
0295 encode_command_line=True,
0296 task_priority=981,
0297 prodSourceLabel='managed',
0298 task_log={"dataset": "PandaJob_#{pandaid}/",
0299 "destination": "local",
0300 "param_type": "log",
0301 "token": "local",
0302 "type": "template",
0303 "value": "log.tgz"},
0304 task_cloud=task_cloud)
0305
0306 pending_time = 12
0307
0308 workflow = Workflow(pending_time=pending_time)
0309 workflow.add_work(work1)
0310 """
0311 workflow.add_work(work2)
0312 workflow.add_work(work3)
0313 workflow.add_work(work4)
0314 workflow.add_work(work5)
0315 workflow.add_work(work6)
0316 """
0317 workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0318 return workflow
0319
0320
0321 if __name__ == '__main__':
0322 host = get_rest_host()
0323 workflow = setup_workflow()
0324
0325 wm = ClientManager(host=host)
0326
0327 request_id = wm.submit(workflow, use_dataset_name=False)
0328 print(request_id)