File indexing completed on 2026-04-09 07:58:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test lsst generic workflow.
0014 """
0015
0016 import json
0017 import logging
0018 import sys
0019
0020 logging.basicConfig(level=logging.DEBUG)
0021
0022
0023
0024 from collections import Counter
0025 from lsst.ctrl.bps import generic_workflow as gw
0026
0027
0028
0029
0030
0031 from idds.client.clientmanager import ClientManager
0032
0033 from idds.common.utils import get_rest_host
0034
0035
0036
0037
0038
0039 from idds.workflowv2.workflow import Workflow, Condition
0040
0041 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0042 from idds.doma.workflowv2.domapandaeswork import DomaPanDAESWork
0043 from idds.doma.workflowv2.domatree import DomaTree
0044 from idds.doma.workflowv2.domaeventmap import DomaEventMap
0045
0046
0047 task_cloud = 'US'
0048 task_queue = 'SLAC_Rubin'
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059 def setup_gw_workflow():
0060 gwf = gw.GenericWorkflow("mytest")
0061
0062 exec1 = gw.GenericWorkflowExec(
0063 name="test.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False
0064 )
0065 job1 = gw.GenericWorkflowJob("job1", label="label1")
0066 job1.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0067 job1.executable = exec1
0068
0069 job2 = gw.GenericWorkflowJob("job2", label="label2")
0070 job2.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0071 job2.executable = exec1
0072
0073 job3 = gw.GenericWorkflowJob("job3")
0074 job3.label = "label2"
0075 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0076 job3.executable = exec1
0077
0078 gwf.add_job(job1)
0079 gwf.add_job(job2)
0080 gwf.add_job(job3)
0081 gwf.add_job_relationships("job1", ["job2", "job3"])
0082
0083
0084 srcjob1 = gw.GenericWorkflowJob("srcjob1")
0085 srcjob1.label = "srclabel1"
0086 srcjob1.executable = exec1
0087 srcjob2 = gw.GenericWorkflowJob("srcjob2")
0088 srcjob2.label = "srclabel1"
0089 srcjob2.executable = exec1
0090 srcjob3 = gw.GenericWorkflowJob("srcjob3")
0091 srcjob3.label = "srclabel2"
0092 srcjob3.executable = exec1
0093 srcjob4 = gw.GenericWorkflowJob("srcjob4")
0094 srcjob4.label = "srclabel2"
0095 srcjob4.executable = exec1
0096 gwf2 = gw.GenericWorkflow("mytest2")
0097 gwf2.add_job(srcjob1)
0098 gwf2.add_job(srcjob2)
0099 gwf2.add_job(srcjob3)
0100 gwf2.add_job(srcjob4)
0101 gwf2.add_job_relationships("srcjob1", "srcjob3")
0102 gwf2.add_job_relationships("srcjob2", "srcjob4")
0103
0104 gwf.add_workflow_source(gwf2)
0105 return gwf
0106
0107
0108 def setup_gw_workflow2():
0109 gwf = gw.GenericWorkflow("mytest")
0110
0111 exec1 = gw.GenericWorkflowExec(
0112 name="test1.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False
0113 )
0114
0115 exec2 = gw.GenericWorkflowExec(
0116 name="test2.py", src_uri="${CTRL_BPS_DIR}/bin/test2.py", transfer_executable=False
0117 )
0118
0119 exec3 = gw.GenericWorkflowExec(
0120 name="test3.py", src_uri="${CTRL_BPS_DIR}/bin/test3.py", transfer_executable=False
0121 )
0122
0123 exec4 = gw.GenericWorkflowExec(
0124 name="test4.py", src_uri="${CTRL_BPS_DIR}/bin/test4.py", transfer_executable=False
0125 )
0126
0127 exec5 = gw.GenericWorkflowExec(
0128 name="test5.py", src_uri="${CTRL_BPS_DIR}/bin/test5.py", transfer_executable=False
0129 )
0130
0131 job1 = gw.GenericWorkflowJob("init1", label="init")
0132 job1.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0133 job1.executable = exec1
0134 gwf.add_job(job1)
0135
0136 for i in range(1000):
0137 job2 = gw.GenericWorkflowJob("isr_%s" % i, label="isr")
0138 job2.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0139 job2.executable = exec2
0140 job2.attrs['grouping'] = True
0141 job2.attrs['grouping_max_jobs'] = 160
0142 gwf.add_job(job2)
0143 gwf.add_job_relationships("init1", "isr_%s" % i)
0144
0145 for i in range(1000):
0146 job3 = gw.GenericWorkflowJob("characterizeImage_%s" % i, label="characterizeImage")
0147 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0148 job3.executable = exec3
0149 job3.attrs['grouping'] = True
0150 job3.attrs['grouping_max_jobs'] = 160
0151 gwf.add_job(job3)
0152 gwf.add_job_relationships("isr_%s" % i, "characterizeImage_%s" % i)
0153
0154 for i in range(1000):
0155 job4 = gw.GenericWorkflowJob("calibrate_%s" % i, label="calibrate")
0156 job4.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0157 job4.executable = exec4
0158 job4.attrs['grouping'] = True
0159 job4.attrs['grouping_max_jobs'] = 100
0160 gwf.add_job(job4)
0161 gwf.add_job_relationships("characterizeImage_%s" % i, "calibrate_%s" % i)
0162
0163 for i in range(10):
0164 job5 = gw.GenericWorkflowJob("writePreSourceTable_%s" % i, label="writePreSourceTable")
0165 job5.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0166 job5.executable = exec5
0167 job5.attrs['grouping'] = True
0168 job5.attrs['grouping_max_jobs'] = 8
0169 gwf.add_job(job5)
0170
0171 for i in range(1000):
0172 gwf.add_job_relationships("calibrate_%s" % i, "writePreSourceTable_%s" % int(i / 100))
0173
0174 return gwf
0175
0176
0177 def test_show_jobs(generic_workflow):
0178 for job_label in generic_workflow.labels:
0179 jobs_by_label = generic_workflow.get_jobs_by_label(job_label)
0180 for gwjob in jobs_by_label:
0181
0182
0183
0184
0185
0186 for parent_job_name in generic_workflow.predecessors(gwjob.name):
0187
0188
0189
0190 pass
0191
0192
0193
0194
0195 def construct_doma_jobs(generic_workflow):
0196 tree = DomaTree('test_tree', group_type='width')
0197 grouped_jobs = tree.from_generic_workflow(generic_workflow)
0198
0199 for grouped_label in grouped_jobs:
0200 print(grouped_label)
0201 for eventservice in grouped_jobs[grouped_label]:
0202 print(" %s" % eventservice['name'])
0203 print(" %s" % eventservice['events'])
0204
0205 event_map = tree.construct_event_map(grouped_jobs)
0206 print(event_map)
0207
0208 event_map1 = DomaEventMap()
0209 event_map1.load()
0210 print(event_map1)
0211
0212 return event_map
0213
0214
0215 def setup_workflow(event_map):
0216 pending_time = 12
0217
0218 workflow = Workflow(pending_time=pending_time)
0219 workflow.name = event_map.name
0220
0221 for task_name in event_map.tasks:
0222 task = event_map.get_task(task_name)
0223 executable = "cmd_line_es_decoder.py"
0224 dependency_map = task.get_dependency_map()
0225 work = DomaPanDAESWork(executable=executable,
0226 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0227 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0228 log_collections=[], es_dependency_map=dependency_map,
0229 task_name=task_name, task_queue=task_queue,
0230 encode_command_line=True,
0231 prodSourceLabel='managed',
0232 task_log={"dataset": "PandaJob_#{pandaid}/",
0233 "destination": "local",
0234 "param_type": "log",
0235 "token": "local",
0236 "type": "template",
0237 "value": "log.tgz"},
0238 task_cloud=task_cloud)
0239 workflow.add_work(work)
0240 return workflow
0241
0242
0243 def test():
0244 gw_workflow = setup_gw_workflow()
0245
0246
0247 test_show_jobs(gw_workflow)
0248 event_map = construct_doma_jobs(gw_workflow)
0249
0250 print("event_map")
0251 print(json.dumps(event_map.dict(), sort_keys=True, indent=4))
0252
0253
0254 print("task_dep_map")
0255
0256 for task_name in event_map.tasks:
0257 task = event_map.get_task(task_name)
0258 print("task_name :%s" % task_name)
0259 print(json.dumps(task.get_dependency_map(), sort_keys=True, indent=4))
0260 workflow = setup_workflow(event_map)
0261
0262
0263
0264 host = get_rest_host()
0265 wm = ClientManager(host=host)
0266
0267 request_id = wm.submit(workflow, use_dataset_name=False)
0268 print(request_id)
0269
0270
0271 def test_load():
0272 event_map = DomaEventMap()
0273 event_map.load()
0274 print(event_map)
0275
0276
0277 task_name = "srclabel1_srclabel2_label1_label2"
0278 job_name = "eventservice_srclabel1_srclabel2_label1_label2_0_0"
0279 event_id = [0, 1, 2, 3]
0280 event_task = event_map.get_task(task_name)
0281 event_job = event_task.get_job(job_name)
0282
0283
0284
0285 event_job.set_event_finished(event_id[0], reported=True)
0286 event_job.set_event_failed(event_id[1], reported=True)
0287 event_job.set_event_missing(event_id[2], reported=True)
0288
0289
0290 is_ok = event_job.is_ok_to_process_event(event_id[3])
0291 print("is_ok: %s" % is_ok)
0292 if is_ok:
0293 event = event_job.get_event(event_id[3])
0294
0295 print(event)
0296
0297 print(event.gwjob)
0298 event_job.set_event_finished(event.event_index)
0299
0300 to_report = event_job.get_events_to_report()
0301 print(to_report)
0302
0303 event_job.acknowledge_event_report(to_report)
0304 to_report = event_job.get_events_to_report()
0305 print(to_report)
0306
0307
0308 def test1():
0309
0310
0311 gw_workflow = setup_gw_workflow2()
0312 test_show_jobs(gw_workflow)
0313 construct_doma_jobs(gw_workflow)
0314
0315
0316 def test_load1():
0317 event_map = DomaEventMap()
0318 event_map.load()
0319 print(event_map)
0320
0321
0322
0323
0324
0325
0326 if __name__ == '__main__':
0327 test()
0328 test_load()
0329
0330