File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 """
0014 Test client.
0015 """
0016
0017 import string
0018 import random
0019
0020
0021
0022
0023
0024
0025 from idds.common.utils import json_dumps
0026 from idds.common.constants import ContentStatus, RequestType, RequestStatus
0027 from idds.core.requests import get_requests, add_request
0028 from idds.core.messages import retrieve_messages
0029 from idds.core.transforms import get_transforms
0030 from idds.core.workprogress import get_workprogresses
0031 from idds.core.processings import get_processings
0032 from idds.core import transforms as core_transforms
0033
0034
0035 from idds.client.clientmanager import ClientManager
0036
0037 from idds.common.utils import get_rest_host
0038
0039
0040
0041
0042
0043 from idds.workflowv2.workflow import Workflow
0044
0045 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0046
0047
0048 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0049
0050
0051 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0052 return ''.join(random.choice(chars) for _ in range(N))
0053
0054
0055 class PanDATask(object):
0056 name = None
0057 step = None
0058 dependencies = []
0059
0060
0061 def get_workflow():
0062
0063 taskN1 = PanDATask()
0064 taskN1.step = "step1"
0065 taskN1.name = taskN1.step + "_" + randStr()
0066 taskN1.dependencies = [
0067 {"name": "00000" + str(k),
0068 "dependencies": [],
0069 "submitted": False} for k in range(6)
0070 ]
0071
0072 taskN2 = PanDATask()
0073 taskN2.step = "step2"
0074 taskN2.name = taskN2.step + "_" + randStr()
0075 taskN2.dependencies = [
0076 {
0077 "name": "000010",
0078 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079 {"task": taskN1.name, "inputname": "000002", "available": False}],
0080 "submitted": False
0081 },
0082 {
0083 "name": "000011",
0084 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085 {"task": taskN1.name, "inputname": "000002", "available": False}],
0086 "submitted": False
0087 },
0088 {
0089 "name": "000012",
0090 "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0091 {"task": taskN1.name, "inputname": "000002", "available": False}],
0092 "submitted": False
0093 }
0094 ]
0095
0096 taskN3 = PanDATask()
0097 taskN3.step = "step3"
0098 taskN3.name = taskN3.step + "_" + randStr()
0099 taskN3.dependencies = [
0100 {
0101 "name": "000020",
0102 "dependencies": [],
0103 "submitted": False
0104 },
0105 {
0106 "name": "000021",
0107 "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0108 {"task": taskN2.name, "inputname": "000011", "available": False}],
0109 "submitted": False
0110 },
0111 {
0112 "name": "000022",
0113 "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0114 {"task": taskN2.name, "inputname": "000012", "available": False}],
0115 "submitted": False
0116 },
0117 {
0118 "name": "000023",
0119 "dependencies": [],
0120 "submitted": False
0121 },
0122 {
0123 "name": "000024",
0124 "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0125 {"task": taskN3.name, "inputname": "000023", "available": False}],
0126 "submitted": False
0127 },
0128 ]
0129
0130 work1 = DomaPanDAWork(executable='echo',
0131 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0132 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0133 log_collections=[], dependency_map=taskN1.dependencies, task_name=taskN1.name, task_queue=task_queue)
0134 work2 = DomaPanDAWork(executable='echo',
0135 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0136 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0137 log_collections=[], dependency_map=taskN2.dependencies, task_name=taskN2.name, task_queue=task_queue)
0138 work3 = DomaPanDAWork(executable='echo',
0139 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0140 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0141 log_collections=[], dependency_map=taskN3.dependencies, task_name=taskN3.name, task_queue=task_queue)
0142
0143 workflow = Workflow()
0144 workflow.add_work(work1)
0145 workflow.add_work(work2)
0146 workflow.add_work(work3)
0147 return workflow
0148
0149
0150 def get_workflow_props(workflow):
0151 props = {
0152 'scope': 'workflow',
0153 'name': workflow.name,
0154 'requester': 'panda',
0155 'request_type': RequestType.Workflow,
0156 'transform_tag': 'workflow',
0157 'status': RequestStatus.New,
0158 'priority': 0,
0159 'lifetime': 30,
0160 'workload_id': workflow.get_workload_id(),
0161 'request_metadata': {'workload_id': workflow.get_workload_id(), 'workflow': workflow}
0162 }
0163
0164 primary_init_work = workflow.get_primary_initial_collection()
0165 if primary_init_work:
0166 props['scope'] = primary_init_work['scope']
0167 props['name'] = primary_init_work['name']
0168 return props
0169
0170
0171 def test_running_data():
0172 workflow = get_workflow()
0173 props = get_workflow_props(workflow)
0174 request_id = add_request(**props)
0175 return request_id
0176
0177
0178 def show_request(req):
0179 workflow = req['processing_metadata']['workflow']
0180 print(workflow.independent_works)
0181 print(len(workflow.independent_works))
0182 print(workflow.works_template.keys())
0183 print(len(workflow.works_template.keys()))
0184 print(workflow.work_sequence.keys())
0185 print(len(workflow.work_sequence.keys()))
0186 print(workflow.works.keys())
0187 print(len(workflow.works.keys()))
0188
0189 work_ids = []
0190 for i_id in workflow.works:
0191 work = workflow.works[i_id]
0192 print(i_id)
0193 print(work.work_name)
0194 print(work.task_name)
0195 print(work.work_id)
0196 work_ids.append(work.work_id)
0197 print(work_ids)
0198
0199
0200 def test_get_requests(request_id):
0201 reqs = get_requests(request_id=request_id, with_detail=False)
0202 for req in reqs:
0203
0204 print(req)
0205 print(json_dumps(req, sort_keys=True, indent=4))
0206 pass
0207
0208
0209 if __name__ == '__main__':
0210 request_id = test_running_data()
0211 print(request_id)
0212 test_get_requests(request_id=3)