File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 """
0014 Test client.
0015 """
0016
0017 import json
0018 import sys
0019 import string
0020 import random
0021 import time
0022
0023
0024
0025
0026
0027
0028
0029 from idds.client.clientmanager import ClientManager
0030
0031 from idds.common.utils import get_rest_host
0032
0033
0034
0035
0036
0037 from idds.workflowv2.workflow import Workflow
0038
0039 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0040
0041
0042 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0043 site = 'in2p3'
0044 task_cloud = 'EU'
0045
0046 task_queue = 'CC-IN2P3_Rubin'
0047 task_queue1 = 'CC-IN2P3_Rubin_Medium'
0048 task_queue2 = 'CC-IN2P3_Rubin_Himem'
0049 task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0050 task_queue4 = 'CC-IN2P3_Rubin_Merge'
0051 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0052 site = 'lancs'
0053 task_cloud = 'EU'
0054
0055 task_queue = 'LANCS_Rubin'
0056 task_queue1 = 'LANCS_Rubin_Medium'
0057 task_queue2 = 'LANCS_Rubin_Himem'
0058 task_queue3 = 'LANCS_Rubin_Extra_Himem'
0059 task_queue3 = 'LANCS_Rubin_Himem'
0060 task_queue4 = 'LANCS_Rubin_Merge'
0061 else:
0062 site = 'slac'
0063
0064 task_cloud = 'US'
0065
0066 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0067
0068
0069
0070 task_queue = 'SLAC_Rubin'
0071 task_queue1 = 'SLAC_Rubin_Medium'
0072 task_queue2 = 'SLAC_Rubin_Himem'
0073 task_queue3 = 'SLAC_Rubin_Extra_Himem'
0074 task_queue4 = 'SLAC_Rubin_Merge'
0075
0076
0077 task_queue = 'SLAC_TEST'
0078 task_queue4 = task_queue3 = task_queue2 = task_queue1 = task_queue
0079
0080
0081
0082
0083 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0084 return ''.join(random.choice(chars) for _ in range(N))
0085
0086
0087 class PanDATask(object):
0088 name = None
0089 step = None
0090 dependencies = []
0091
0092
0093 def setup_workflow():
0094
0095 es_map = {}
0096 taskN1 = PanDATask()
0097 taskN1.step = "step1"
0098 taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0099 taskN1.dependencies = [
0100 {"name": "00000" + str(k),
0101 "order_id": k,
0102 "dependencies": [],
0103 "submitted": False} for k in range(6)
0104 ]
0105
0106 es_map[taskN1.step] = {str(item["order_id"]): item["name"] for item in taskN1.dependencies}
0107
0108 taskN2 = PanDATask()
0109 taskN2.step = "step2"
0110 taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0111 taskN2.dependencies = [
0112 {
0113 "name": "000010",
0114 "order_id": 0,
0115 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0116 {"task": taskN1.name, "inputname": "000002", "available": False}],
0117 "submitted": False
0118 },
0119 {
0120 "name": "000011",
0121 "order_id": 1,
0122 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0123 {"task": taskN1.name, "inputname": "000002", "available": False}],
0124 "submitted": False
0125 },
0126 {
0127 "name": "000012",
0128 "order_id": 2,
0129 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0130 {"task": taskN1.name, "inputname": "000002", "available": False}],
0131 "submitted": False
0132 }
0133 ]
0134
0135 es_map[taskN2.step] = {str(item["order_id"]): item["name"] for item in taskN2.dependencies}
0136
0137 taskN3 = PanDATask()
0138 taskN3.step = "step3"
0139 taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0140 taskN3.dependencies = [
0141 {
0142 "name": "000020",
0143 "order_id": 0,
0144 "dependencies": [],
0145 "submitted": False
0146 },
0147 {
0148 "name": "000021",
0149 "order_id": 1,
0150 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0151 {"task": taskN2.name, "inputname": "000011", "available": False}],
0152 "submitted": False
0153 },
0154 {
0155 "name": "000022",
0156 "order_id": 2,
0157 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0158 {"task": taskN2.name, "inputname": "000012", "available": False}],
0159 "submitted": False
0160 },
0161 {
0162 "name": "000023",
0163 "order_id": 3,
0164 "dependencies": [],
0165 "submitted": False
0166 },
0167 {
0168 "name": "000024",
0169 "order_id": 4,
0170 "groups": taskN3.name,
0171 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0172 {"task": taskN3.name, "inputname": "000023", "available": False}],
0173 "submitted": False
0174 },
0175 ]
0176
0177 es_map[taskN3.step] = {str(item["order_id"]): item["name"] for item in taskN3.dependencies}
0178
0179 taskN4 = PanDATask()
0180 taskN4.step = "step4"
0181 taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0182 taskN4.dependencies = [
0183 {"name": "00004" + str(k),
0184 "order_id": k,
0185 "dependencies": [],
0186 "submitted": False} for k in range(6)
0187 ]
0188
0189 es_map[taskN4.step] = {str(item["order_id"]): item["name"] for item in taskN4.dependencies}
0190
0191 taskN5 = PanDATask()
0192 taskN5.step = "step5"
0193 taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0194 taskN5.dependencies = [
0195 {"name": "00005" + str(k),
0196 "order_id": k,
0197 "dependencies": [],
0198 "submitted": False} for k in range(6)
0199 ]
0200
0201 es_map[taskN5.step] = {str(item["order_id"]): item["name"] for item in taskN5.dependencies}
0202
0203
0204
0205
0206
0207
0208 es_map_file = "/sdf/data/rubin/panda_jobs/panda_env_pilot/test_rubin_es_map.json"
0209 executable = "export RUBIN_ES_CORES=4; echo; RUBIN_ES_MAP_FILE=%s; echo ${IN/L}" % es_map_file
0210
0211 work1 = DomaPanDAWork(executable=executable,
0212 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0213 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0214 log_collections=[], dependency_map=taskN1.dependencies,
0215 task_name=taskN1.name, task_queue=task_queue,
0216 encode_command_line=True,
0217 task_priority=981,
0218 es=True,
0219 es_label=taskN1.step,
0220 max_events_per_job=100,
0221 prodSourceLabel='managed',
0222 task_log={"dataset": "PandaJob_#{pandaid}/",
0223 "destination": "local",
0224 "param_type": "log",
0225 "token": "local",
0226 "type": "template",
0227 "value": "log.tgz"},
0228 task_cloud=task_cloud)
0229 work2 = DomaPanDAWork(executable=executable,
0230 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0231 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0232 log_collections=[], dependency_map=taskN2.dependencies,
0233 task_name=taskN2.name, task_queue=task_queue1,
0234 encode_command_line=True,
0235 task_priority=881,
0236 es=True,
0237 es_label=taskN2.step,
0238 prodSourceLabel='managed',
0239 task_log={"dataset": "PandaJob_#{pandaid}/",
0240 "destination": "local",
0241 "param_type": "log",
0242 "token": "local",
0243 "type": "template",
0244 "value": "log.tgz"},
0245 task_cloud=task_cloud)
0246 work3 = DomaPanDAWork(executable=executable,
0247 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0248 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0249 log_collections=[], dependency_map=taskN3.dependencies,
0250 task_name=taskN3.name, task_queue=task_queue2,
0251 encode_command_line=True,
0252 task_priority=781,
0253 es=True,
0254 es_label=taskN3.step,
0255 prodSourceLabel='managed',
0256 task_log={"dataset": "PandaJob_#{pandaid}/",
0257 "destination": "local",
0258 "param_type": "log",
0259 "token": "local",
0260 "type": "template",
0261 "value": "log.tgz"},
0262 task_cloud=task_cloud)
0263
0264 work4 = DomaPanDAWork(executable=executable,
0265 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0266 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0267 log_collections=[], dependency_map=taskN4.dependencies,
0268 task_name=taskN4.name, task_queue=task_queue3,
0269 encode_command_line=True,
0270 task_priority=981,
0271 es=True,
0272 es_label=taskN4.step,
0273 prodSourceLabel='managed',
0274 task_log={"dataset": "PandaJob_#{pandaid}/",
0275 "destination": "local",
0276 "param_type": "log",
0277 "token": "local",
0278 "type": "template",
0279 "value": "log.tgz"},
0280 task_cloud=task_cloud)
0281
0282 work5 = DomaPanDAWork(executable=executable,
0283 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0284 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0285 log_collections=[], dependency_map=taskN5.dependencies,
0286 task_name=taskN5.name, task_queue=task_queue4,
0287 encode_command_line=True,
0288 task_priority=981,
0289 es=True,
0290 es_label=taskN5.step,
0291 prodSourceLabel='managed',
0292 task_log={"dataset": "PandaJob_#{pandaid}/",
0293 "destination": "local",
0294 "param_type": "log",
0295 "token": "local",
0296 "type": "template",
0297 "value": "log.tgz"},
0298 task_cloud=task_cloud)
0299
0300 pending_time = 12
0301
0302 workflow = Workflow(pending_time=pending_time)
0303 workflow.add_work(work1)
0304 workflow.add_work(work2)
0305 workflow.add_work(work3)
0306 workflow.add_work(work4)
0307 workflow.add_work(work5)
0308 workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0309 return workflow
0310
0311
0312 if __name__ == '__main__':
0313 host = get_rest_host()
0314 workflow = setup_workflow()
0315
0316 wm = ClientManager(host=host)
0317
0318 request_id = wm.submit(workflow, use_dataset_name=False)
0319 print(request_id)