File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 """
0014 Test client.
0015 """
0016
0017 import sys
0018 import string
0019 import random
0020 import time
0021
0022
0023
0024
0025
0026
0027
0028 from idds.client.clientmanager import ClientManager
0029
0030 from idds.common.utils import get_rest_host
0031
0032
0033
0034
0035
0036 from idds.workflowv2.workflow import Workflow
0037
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039
0040
0041 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0042 site = 'in2p3'
0043 panda_site = "CC-IN2P3"
0044 task_cloud = 'EU'
0045
0046 task_queue = 'CC-IN2P3_Rubin'
0047 task_queue1 = 'CC-IN2P3_Rubin_Medium'
0048 task_queue2 = 'CC-IN2P3_Rubin_Himem'
0049 task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0050 task_queue4 = 'CC-IN2P3_Rubin_Merge'
0051 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0052 site = 'lancs'
0053 panda_site = "LANCS"
0054 task_cloud = 'EU'
0055
0056 task_queue = 'LANCS_Rubin'
0057 task_queue1 = 'LANCS_Rubin_Medium'
0058 task_queue2 = 'LANCS_Rubin_Himem'
0059 task_queue3 = 'LANCS_Rubin_Extra_Himem'
0060 task_queue3 = 'LANCS_Rubin_Himem'
0061 task_queue4 = 'LANCS_Rubin_Merge'
0062 else:
0063 site = 'slac'
0064 panda_site = "SLAC"
0065
0066 task_cloud = 'US'
0067
0068 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0069
0070
0071
0072 task_queue = 'SLAC_Rubin'
0073 task_queue1 = 'SLAC_Rubin_Medium'
0074 task_queue2 = 'SLAC_Rubin_Himem'
0075 task_queue3 = 'SLAC_Rubin_Extra_Himem'
0076 task_queue4 = 'SLAC_Rubin_Merge'
0077
0078
0079
0080
0081
0082
0083
0084
0085 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0086 return ''.join(random.choice(chars) for _ in range(N))
0087
0088
0089 class PanDATask(object):
0090 name = None
0091 step = None
0092 dependencies = []
0093
0094
0095 def setup_workflow():
0096
0097 taskN1 = PanDATask()
0098 taskN1.step = "step1"
0099 taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0100 taskN1.dependencies = [
0101 {"name": "00000" + str(k),
0102 "order_id": k,
0103 "dependencies": [],
0104 "submitted": False} for k in range(6)
0105 ]
0106
0107 taskN2 = PanDATask()
0108 taskN2.step = "step2"
0109 taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0110 taskN2.dependencies = [
0111 {
0112 "name": "000010",
0113 "order_id": 0,
0114 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0115 {"task": taskN1.name, "inputname": "000002", "available": False}],
0116 "submitted": False
0117 },
0118 {
0119 "name": "000011",
0120 "order_id": 1,
0121 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0122 {"task": taskN1.name, "inputname": "000002", "available": False}],
0123 "submitted": False
0124 },
0125 {
0126 "name": "000012",
0127 "order_id": 2,
0128 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0129 {"task": taskN1.name, "inputname": "000002", "available": False}],
0130 "submitted": False
0131 }
0132 ]
0133
0134 taskN3 = PanDATask()
0135 taskN3.step = "step3"
0136 taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0137 taskN3.dependencies = [
0138 {
0139 "name": "000020",
0140 "order_id": 0,
0141 "dependencies": [],
0142 "submitted": False
0143 },
0144 {
0145 "name": "000021",
0146 "order_id": 1,
0147 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0148 {"task": taskN2.name, "inputname": "000011", "available": False}],
0149 "submitted": False
0150 },
0151 {
0152 "name": "000022",
0153 "order_id": 2,
0154 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0155 {"task": taskN2.name, "inputname": "000012", "available": False}],
0156 "submitted": False
0157 },
0158 {
0159 "name": "000023",
0160 "order_id": 3,
0161 "dependencies": [],
0162 "submitted": False
0163 },
0164 {
0165 "name": "000024",
0166 "order_id": 4,
0167 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0168 {"task": taskN3.name, "inputname": "000023", "available": False}],
0169 "submitted": False
0170 },
0171 ]
0172
0173 taskN4 = PanDATask()
0174 taskN4.step = "step4"
0175 taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0176 taskN4.dependencies = [
0177 {"name": "00004" + str(k),
0178 "order_id": k,
0179 "dependencies": [],
0180 "submitted": False} for k in range(6)
0181 ]
0182
0183 taskN5 = PanDATask()
0184 taskN5.step = "step5"
0185 taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0186 taskN5.dependencies = [
0187 {"name": "00005" + str(k),
0188 "order_id": k,
0189 "dependencies": [],
0190 "submitted": False} for k in range(6)
0191 ]
0192
0193 work1 = DomaPanDAWork(executable='echo',
0194 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0195 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0196 log_collections=[], dependency_map=taskN1.dependencies,
0197 task_name=taskN1.name, task_queue=None, task_site=panda_site, task_rss=3000,
0198 encode_command_line=True,
0199 task_priority=981,
0200 prodSourceLabel='managed',
0201 task_log={"dataset": "PandaJob_#{pandaid}/",
0202 "destination": "local",
0203 "param_type": "log",
0204 "token": "local",
0205 "type": "template",
0206 "value": "log.tgz"},
0207 task_cloud=task_cloud)
0208 work2 = DomaPanDAWork(executable='echo',
0209 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0210 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0211 log_collections=[], dependency_map=taskN2.dependencies,
0212 task_name=taskN2.name, task_queue=None, task_site=panda_site, task_rss=7000,
0213 encode_command_line=True,
0214 task_priority=881,
0215 prodSourceLabel='managed',
0216 task_log={"dataset": "PandaJob_#{pandaid}/",
0217 "destination": "local",
0218 "param_type": "log",
0219 "token": "local",
0220 "type": "template",
0221 "value": "log.tgz"},
0222 task_cloud=task_cloud)
0223 work3 = DomaPanDAWork(executable='echo',
0224 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0225 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0226 log_collections=[], dependency_map=taskN3.dependencies,
0227 task_name=taskN3.name, task_queue=None, task_site=panda_site, task_rss=14000,
0228 encode_command_line=True,
0229 task_priority=781,
0230 prodSourceLabel='managed',
0231 task_log={"dataset": "PandaJob_#{pandaid}/",
0232 "destination": "local",
0233 "param_type": "log",
0234 "token": "local",
0235 "type": "template",
0236 "value": "log.tgz"},
0237 task_cloud=task_cloud)
0238
0239 work4 = DomaPanDAWork(executable='echo',
0240 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0241 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0242 log_collections=[], dependency_map=taskN4.dependencies,
0243 task_name=taskN4.name, task_queue=None, task_site=panda_site, task_rss=20000,
0244 encode_command_line=True,
0245 task_priority=981,
0246 prodSourceLabel='managed',
0247 task_log={"dataset": "PandaJob_#{pandaid}/",
0248 "destination": "local",
0249 "param_type": "log",
0250 "token": "local",
0251 "type": "template",
0252 "value": "log.tgz"},
0253 task_cloud=task_cloud)
0254
0255 work5 = DomaPanDAWork(executable='echo',
0256 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0257 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0258 log_collections=[], dependency_map=taskN5.dependencies,
0259 task_name=taskN5.name, task_queue=None, task_site=panda_site, task_rss=33000,
0260 encode_command_line=True,
0261 task_priority=981,
0262 prodSourceLabel='managed',
0263 task_log={"dataset": "PandaJob_#{pandaid}/",
0264 "destination": "local",
0265 "param_type": "log",
0266 "token": "local",
0267 "type": "template",
0268 "value": "log.tgz"},
0269 task_cloud=task_cloud)
0270
0271 pending_time = 12
0272
0273 workflow = Workflow(pending_time=pending_time)
0274 workflow.add_work(work1)
0275 workflow.add_work(work2)
0276 workflow.add_work(work3)
0277 workflow.add_work(work4)
0278 workflow.add_work(work5)
0279 workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0280 return workflow
0281
0282
0283 if __name__ == '__main__':
0284 host = get_rest_host()
0285 workflow = setup_workflow()
0286
0287 wm = ClientManager(host=host)
0288
0289 request_id = wm.submit(workflow, use_dataset_name=False)
0290 print(request_id)