File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test client.
0014 """
0015
0016 import os
0017 import sys
0018 import string
0019 import random
0020 import time
0021
0022
0023
0024
0025
0026
0027
0028 from idds.client.clientmanager import ClientManager
0029
0030
0031
0032
0033
0034
0035
0036 from idds.workflowv2.workflow import Workflow
0037
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039
0040
0041 task_cloud = 'US'
0042
0043 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0044
0045
0046
0047 task_queue = 'SLAC_Rubin'
0048
0049
0050
0051 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0052 return ''.join(random.choice(chars) for _ in range(N))
0053
0054
0055 class PanDATask(object):
0056 name = None
0057 step = None
0058 dependencies = []
0059
0060
0061 def setup_workflow():
0062
0063 taskN1 = PanDATask()
0064 taskN1.step = "step1"
0065 taskN1.name = taskN1.step + "_" + randStr()
0066 taskN1.dependencies = [
0067 {"name": "00000" + str(k),
0068 "dependencies": [],
0069 "submitted": False} for k in range(6)
0070 ]
0071
0072 taskN2 = PanDATask()
0073 taskN2.step = "step2"
0074 taskN2.name = taskN2.step + "_" + randStr()
0075 taskN2.dependencies = [
0076 {
0077 "name": "000010",
0078 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079 {"task": taskN1.name, "inputname": "000002", "available": False}],
0080 "submitted": False
0081 },
0082 {
0083 "name": "000011",
0084 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085 {"task": taskN1.name, "inputname": "000002", "available": False}],
0086 "submitted": False
0087 },
0088 {
0089 "name": "000012",
0090 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0091 {"task": taskN1.name, "inputname": "000002", "available": False}],
0092 "submitted": False
0093 }
0094 ]
0095
0096 taskN3 = PanDATask()
0097 taskN3.step = "step3"
0098 taskN3.name = taskN3.step + "_" + randStr()
0099 taskN3.dependencies = [
0100 {
0101 "name": "000020",
0102 "dependencies": [],
0103 "submitted": False
0104 },
0105 {
0106 "name": "000021",
0107 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0108 {"task": taskN2.name, "inputname": "000011", "available": False}],
0109 "submitted": False
0110 },
0111 {
0112 "name": "000022",
0113 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0114 {"task": taskN2.name, "inputname": "000012", "available": False}],
0115 "submitted": False
0116 },
0117 {
0118 "name": "000023",
0119 "dependencies": [],
0120 "submitted": False
0121 },
0122 {
0123 "name": "000024",
0124 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0125 {"task": taskN3.name, "inputname": "000023", "available": False}],
0126 "submitted": False
0127 },
0128 ]
0129
0130 work1 = DomaPanDAWork(executable='echo',
0131 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0132 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0133 log_collections=[], dependency_map=taskN1.dependencies,
0134 task_name=taskN1.name, task_queue=task_queue,
0135 encode_command_line=True,
0136 prodSourceLabel='managed',
0137 task_log={"dataset": "PandaJob_#{pandaid}/",
0138 "destination": "local",
0139 "param_type": "log",
0140 "token": "local",
0141 "type": "template",
0142 "value": "log.tgz"},
0143 task_cloud=task_cloud)
0144 work2 = DomaPanDAWork(executable='echo',
0145 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0146 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0147 log_collections=[], dependency_map=taskN2.dependencies,
0148 task_name=taskN2.name, task_queue=task_queue,
0149 encode_command_line=True,
0150 prodSourceLabel='managed',
0151 task_log={"dataset": "PandaJob_#{pandaid}/",
0152 "destination": "local",
0153 "param_type": "log",
0154 "token": "local",
0155 "type": "template",
0156 "value": "log.tgz"},
0157 task_cloud=task_cloud)
0158 work3 = DomaPanDAWork(executable='echo',
0159 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0160 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0161 log_collections=[], dependency_map=taskN3.dependencies,
0162 task_name=taskN3.name, task_queue=task_queue,
0163 encode_command_line=True,
0164 prodSourceLabel='managed',
0165 task_log={"dataset": "PandaJob_#{pandaid}/",
0166 "destination": "local",
0167 "param_type": "log",
0168 "token": "local",
0169 "type": "template",
0170 "value": "log.tgz"},
0171 task_cloud=task_cloud)
0172
0173 pending_time = 12
0174
0175 workflow = Workflow(pending_time=pending_time)
0176 workflow.add_work(work1)
0177 workflow.add_work(work2)
0178 workflow.add_work(work3)
0179 workflow.name = 'test_workflow.idds.%s.test' % time.time()
0180 return workflow
0181
0182
0183 if __name__ == '__main__':
0184
0185 idds_host = "https://aipanda160.cern.ch:443/idds"
0186
0187
0188 request_id = os.environ.get("IDDS_BUILD_REQUEST_ID", None)
0189 signature = os.environ.get("IDDS_BUIL_SIGNATURE", None)
0190
0191 if request_id is None:
0192 print("IDDS_BUILD_REQUEST_ID is not defined.")
0193 sys.exit(-1)
0194 if signature is None:
0195 print("IDDS_BUIL_SIGNATURE is not defined")
0196 sys.exit(-1)
0197
0198 workflow = setup_workflow()
0199
0200 wm = ClientManager(host=idds_host)
0201 wm.setup_json_outputs()
0202 ret = wm.update_build_request(request_id, signature, workflow)
0203 print(ret)
0204 if ret and 'status' in ret:
0205 sys.exit(ret['status'])
0206 else:
0207 sys.exit(-1)