File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test client.
0014 """
0015
0016 import os
0017 import sys
0018 import string
0019 import random
0020 import time
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 from idds.workflowv2.workflow import Workflow
0037
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039
0040 import idds.common.utils as idds_utils
0041 import pandaclient.idds_api
0042
0043
0044 task_cloud = 'US'
0045
0046 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0047
0048
0049
0050 task_queue = 'SLAC_Rubin'
0051
0052
0053
0054 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0055 return ''.join(random.choice(chars) for _ in range(N))
0056
0057
0058 class PanDATask(object):
0059 name = None
0060 step = None
0061 dependencies = []
0062
0063
0064 def setup_workflow():
0065
0066 taskN1 = PanDATask()
0067 taskN1.step = "step1"
0068 taskN1.name = taskN1.step + "_" + randStr()
0069 taskN1.dependencies = [
0070 {"name": "00000" + str(k),
0071 "dependencies": [],
0072 "submitted": False} for k in range(6)
0073 ]
0074
0075 taskN2 = PanDATask()
0076 taskN2.step = "step2"
0077 taskN2.name = taskN2.step + "_" + randStr()
0078 taskN2.dependencies = [
0079 {
0080 "name": "000010",
0081 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0082 {"task": taskN1.name, "inputname": "000002", "available": False}],
0083 "submitted": False
0084 },
0085 {
0086 "name": "000011",
0087 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0088 {"task": taskN1.name, "inputname": "000002", "available": False}],
0089 "submitted": False
0090 },
0091 {
0092 "name": "000012",
0093 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0094 {"task": taskN1.name, "inputname": "000002", "available": False}],
0095 "submitted": False
0096 }
0097 ]
0098
0099 taskN3 = PanDATask()
0100 taskN3.step = "step3"
0101 taskN3.name = taskN3.step + "_" + randStr()
0102 taskN3.dependencies = [
0103 {
0104 "name": "000020",
0105 "dependencies": [],
0106 "submitted": False
0107 },
0108 {
0109 "name": "000021",
0110 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0111 {"task": taskN2.name, "inputname": "000011", "available": False}],
0112 "submitted": False
0113 },
0114 {
0115 "name": "000022",
0116 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0117 {"task": taskN2.name, "inputname": "000012", "available": False}],
0118 "submitted": False
0119 },
0120 {
0121 "name": "000023",
0122 "dependencies": [],
0123 "submitted": False
0124 },
0125 {
0126 "name": "000024",
0127 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0128 {"task": taskN3.name, "inputname": "000023", "available": False}],
0129 "submitted": False
0130 },
0131 ]
0132
0133 work1 = DomaPanDAWork(executable='echo',
0134 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0135 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0136 log_collections=[], dependency_map=taskN1.dependencies,
0137 task_name=taskN1.name, task_queue=task_queue,
0138 encode_command_line=True,
0139 prodSourceLabel='managed',
0140 task_log={"dataset": "PandaJob_#{pandaid}/",
0141 "destination": "local",
0142 "param_type": "log",
0143 "token": "local",
0144 "type": "template",
0145 "value": "log.tgz"},
0146 task_cloud=task_cloud)
0147 work2 = DomaPanDAWork(executable='echo',
0148 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0149 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0150 log_collections=[], dependency_map=taskN2.dependencies,
0151 task_name=taskN2.name, task_queue=task_queue,
0152 encode_command_line=True,
0153 prodSourceLabel='managed',
0154 task_log={"dataset": "PandaJob_#{pandaid}/",
0155 "destination": "local",
0156 "param_type": "log",
0157 "token": "local",
0158 "type": "template",
0159 "value": "log.tgz"},
0160 task_cloud=task_cloud)
0161 work3 = DomaPanDAWork(executable='echo',
0162 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0163 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0164 log_collections=[], dependency_map=taskN3.dependencies,
0165 task_name=taskN3.name, task_queue=task_queue,
0166 encode_command_line=True,
0167 prodSourceLabel='managed',
0168 task_log={"dataset": "PandaJob_#{pandaid}/",
0169 "destination": "local",
0170 "param_type": "log",
0171 "token": "local",
0172 "type": "template",
0173 "value": "log.tgz"},
0174 task_cloud=task_cloud)
0175
0176 pending_time = 12
0177
0178 workflow = Workflow(pending_time=pending_time)
0179 workflow.add_work(work1)
0180 workflow.add_work(work2)
0181 workflow.add_work(work3)
0182 workflow.name = 'test_workflow.idds.%s.test' % time.time()
0183 return workflow
0184
0185
0186 def submit(workflow, idds_server, request_id, signature):
0187
0188
0189
0190
0191
0192 c = pandaclient.idds_api.get_api(idds_utils.json_dumps,
0193 idds_host=idds_server, compress=True, manager=True)
0194 ret = c.update_build_request(request_id, signature, workflow)
0195 print(ret)
0196 return ret
0197
0198
0199 if __name__ == '__main__':
0200
0201
0202 idds_host = None
0203
0204
0205 request_id = os.environ.get("IDDS_BUILD_REQUEST_ID", None)
0206 signature = os.environ.get("IDDS_BUIL_SIGNATURE", None)
0207
0208 if request_id is None:
0209 print("IDDS_BUILD_REQUEST_ID is not defined.")
0210 sys.exit(-1)
0211 if signature is None:
0212 print("IDDS_BUIL_SIGNATURE is not defined")
0213 sys.exit(-1)
0214
0215 workflow = setup_workflow()
0216
0217 ret = submit(workflow, idds_host, request_id, signature)
0218 print(ret)
0219
0220
0221
0222
0223
0224 if ret[0] == 0 and ret[1][0] is True:
0225 sys.exit(0)
0226 sys.exit(-1)