Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023
0010 
0011 
0012 """
0013 Test lsst generic workflow.
0014 """
0015 
0016 import json
0017 import logging
0018 import sys                                                    # noqa E402 F401
0019 
0020 logging.basicConfig(level=logging.DEBUG)
0021 
0022 # import traceback
0023 
0024 from collections import Counter                                # noqa #402
0025 from lsst.ctrl.bps import generic_workflow as gw               # noqa #402
0026 
0027 # from rucio.client.client import Client as Rucio_Client
0028 # from rucio.common.exception import CannotAuthenticate
0029 
0030 # from idds.client.client import Client
0031 from idds.client.clientmanager import ClientManager   # noqa E402
0032 # from idds.common.constants import RequestType, RequestStatus
0033 from idds.common.utils import get_rest_host                               # noqa E402
0034 # from idds.tests.common import get_example_real_tape_stagein_request
0035 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0036 
0037 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0038 # from idds.workflowv2.workflow import Condition, Workflow
0039 from idds.workflowv2.workflow import Workflow, Condition     # noqa E402
0040 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0041 from idds.doma.workflowv2.domapandawork import DomaPanDAWork       # noqa E402
0042 from idds.doma.workflowv2.domapandaeswork import DomaPanDAESWork   # noqa E402
0043 from idds.doma.workflowv2.domatree import DomaTree                 # noqa E402
0044 from idds.doma.workflowv2.domaeventmap import DomaEventMap         # noqa E402
0045 
0046 
0047 task_cloud = 'US'
0048 task_queue = 'SLAC_Rubin'
0049 # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
0050 
0051 
0052 # task_cloud = 'EU'
0053 # task_queue = 'CC-IN2P3_TEST'
0054 
0055 # task_cloud = 'EU'
0056 # task_queue = 'LANCS_TEST'
0057 
0058 
0059 def setup_gw_workflow():
0060     gwf = gw.GenericWorkflow("mytest")
0061 
0062     exec1 = gw.GenericWorkflowExec(
0063         name="test.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False
0064     )
0065     job1 = gw.GenericWorkflowJob("job1", label="label1")
0066     job1.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0067     job1.executable = exec1
0068 
0069     job2 = gw.GenericWorkflowJob("job2", label="label2")
0070     job2.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0071     job2.executable = exec1
0072 
0073     job3 = gw.GenericWorkflowJob("job3")
0074     job3.label = "label2"
0075     job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0076     job3.executable = exec1
0077 
0078     gwf.add_job(job1)
0079     gwf.add_job(job2)
0080     gwf.add_job(job3)
0081     gwf.add_job_relationships("job1", ["job2", "job3"])
0082     # gwf.add_job_relationships("job1", "job2")
0083 
0084     srcjob1 = gw.GenericWorkflowJob("srcjob1")
0085     srcjob1.label = "srclabel1"
0086     srcjob1.executable = exec1
0087     srcjob2 = gw.GenericWorkflowJob("srcjob2")
0088     srcjob2.label = "srclabel1"
0089     srcjob2.executable = exec1
0090     srcjob3 = gw.GenericWorkflowJob("srcjob3")
0091     srcjob3.label = "srclabel2"
0092     srcjob3.executable = exec1
0093     srcjob4 = gw.GenericWorkflowJob("srcjob4")
0094     srcjob4.label = "srclabel2"
0095     srcjob4.executable = exec1
0096     gwf2 = gw.GenericWorkflow("mytest2")
0097     gwf2.add_job(srcjob1)
0098     gwf2.add_job(srcjob2)
0099     gwf2.add_job(srcjob3)
0100     gwf2.add_job(srcjob4)
0101     gwf2.add_job_relationships("srcjob1", "srcjob3")
0102     gwf2.add_job_relationships("srcjob2", "srcjob4")
0103 
0104     gwf.add_workflow_source(gwf2)
0105     return gwf
0106 
0107 
0108 def setup_gw_workflow2():
0109     gwf = gw.GenericWorkflow("mytest")
0110 
0111     exec1 = gw.GenericWorkflowExec(
0112         name="test1.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False
0113     )
0114 
0115     exec2 = gw.GenericWorkflowExec(
0116         name="test2.py", src_uri="${CTRL_BPS_DIR}/bin/test2.py", transfer_executable=False
0117     )
0118 
0119     exec3 = gw.GenericWorkflowExec(
0120         name="test3.py", src_uri="${CTRL_BPS_DIR}/bin/test3.py", transfer_executable=False
0121     )
0122 
0123     exec4 = gw.GenericWorkflowExec(
0124         name="test4.py", src_uri="${CTRL_BPS_DIR}/bin/test4.py", transfer_executable=False
0125     )
0126 
0127     exec5 = gw.GenericWorkflowExec(
0128         name="test5.py", src_uri="${CTRL_BPS_DIR}/bin/test5.py", transfer_executable=False
0129     )
0130 
0131     job1 = gw.GenericWorkflowJob("init1", label="init")
0132     job1.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0133     job1.executable = exec1
0134     gwf.add_job(job1)
0135 
0136     for i in range(1000):
0137         job2 = gw.GenericWorkflowJob("isr_%s" % i, label="isr")
0138         job2.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0139         job2.executable = exec2
0140         job2.attrs['grouping'] = True
0141         job2.attrs['grouping_max_jobs'] = 160
0142         gwf.add_job(job2)
0143         gwf.add_job_relationships("init1", "isr_%s" % i)
0144 
0145     for i in range(1000):
0146         job3 = gw.GenericWorkflowJob("characterizeImage_%s" % i, label="characterizeImage")
0147         job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0148         job3.executable = exec3
0149         job3.attrs['grouping'] = True
0150         job3.attrs['grouping_max_jobs'] = 160
0151         gwf.add_job(job3)
0152         gwf.add_job_relationships("isr_%s" % i, "characterizeImage_%s" % i)
0153 
0154     for i in range(1000):
0155         job4 = gw.GenericWorkflowJob("calibrate_%s" % i, label="calibrate")
0156         job4.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0157         job4.executable = exec4
0158         job4.attrs['grouping'] = True
0159         job4.attrs['grouping_max_jobs'] = 100
0160         gwf.add_job(job4)
0161         gwf.add_job_relationships("characterizeImage_%s" % i, "calibrate_%s" % i)
0162 
0163     for i in range(10):
0164         job5 = gw.GenericWorkflowJob("writePreSourceTable_%s" % i, label="writePreSourceTable")
0165         job5.quanta_counts = Counter({"pt1": 1, "pt2": 2})
0166         job5.executable = exec5
0167         job5.attrs['grouping'] = True
0168         job5.attrs['grouping_max_jobs'] = 8
0169         gwf.add_job(job5)
0170 
0171     for i in range(1000):
0172         gwf.add_job_relationships("calibrate_%s" % i, "writePreSourceTable_%s" % int(i / 100))
0173 
0174     return gwf
0175 
0176 
0177 def test_show_jobs(generic_workflow):
0178     for job_label in generic_workflow.labels:
0179         jobs_by_label = generic_workflow.get_jobs_by_label(job_label)
0180         for gwjob in jobs_by_label:
0181             # pseudo_filename = _make_pseudo_filename(config, gwjob)
0182             # job_to_pseudo_filename[gwjob.name] = pseudo_filename
0183             # job_to_task[gwjob.name] = work.get_work_name()
0184 
0185             # deps = []
0186             for parent_job_name in generic_workflow.predecessors(gwjob.name):
0187                 # deps.append({"task": job_to_task[parent_job_name],
0188                 #              "inputname": job_to_pseudo_filename[parent_job_name],
0189                 #              "available": False})
0190                 pass
0191 
0192             # job = {"name": pseudo_filename, "dependencies": deps}
0193 
0194 
0195 def construct_doma_jobs(generic_workflow):
0196     tree = DomaTree('test_tree', group_type='width')
0197     grouped_jobs = tree.from_generic_workflow(generic_workflow)
0198     # print(grouped_jobs)
0199     for grouped_label in grouped_jobs:
0200         print(grouped_label)
0201         for eventservice in grouped_jobs[grouped_label]:
0202             print("    %s" % eventservice['name'])
0203             print("        %s" % eventservice['events'])
0204 
0205     event_map = tree.construct_event_map(grouped_jobs)
0206     print(event_map)
0207 
0208     event_map1 = DomaEventMap()
0209     event_map1.load()
0210     print(event_map1)
0211 
0212     return event_map
0213 
0214 
0215 def setup_workflow(event_map):
0216     pending_time = 12
0217     # pending_time = None
0218     workflow = Workflow(pending_time=pending_time)
0219     workflow.name = event_map.name
0220 
0221     for task_name in event_map.tasks:
0222         task = event_map.get_task(task_name)
0223         executable = "cmd_line_es_decoder.py"
0224         dependency_map = task.get_dependency_map()
0225         work = DomaPanDAESWork(executable=executable,
0226                                primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0227                                output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0228                                log_collections=[], es_dependency_map=dependency_map,
0229                                task_name=task_name, task_queue=task_queue,
0230                                encode_command_line=True,
0231                                prodSourceLabel='managed',
0232                                task_log={"dataset": "PandaJob_#{pandaid}/",
0233                                          "destination": "local",
0234                                          "param_type": "log",
0235                                          "token": "local",
0236                                          "type": "template",
0237                                          "value": "log.tgz"},
0238                                task_cloud=task_cloud)
0239         workflow.add_work(work)
0240     return workflow
0241 
0242 
0243 def test():
0244     gw_workflow = setup_gw_workflow()
0245     # print(json.dumps(gw_workflow))
0246     # gw_workflow = setup_gw_workflow2()
0247     test_show_jobs(gw_workflow)
0248     event_map = construct_doma_jobs(gw_workflow)
0249 
0250     print("event_map")
0251     print(json.dumps(event_map.dict(), sort_keys=True, indent=4))
0252     # sys.exit(0)
0253 
0254     print("task_dep_map")
0255 
0256     for task_name in event_map.tasks:
0257         task = event_map.get_task(task_name)
0258         print("task_name :%s" % task_name)
0259         print(json.dumps(task.get_dependency_map(), sort_keys=True, indent=4))
0260     workflow = setup_workflow(event_map)
0261 
0262     # sys.exit(0)
0263 
0264     host = get_rest_host()
0265     wm = ClientManager(host=host)
0266     # wm.set_original_user(user_name="wguandev")
0267     request_id = wm.submit(workflow, use_dataset_name=False)
0268     print(request_id)
0269 
0270 
0271 def test_load():
0272     event_map = DomaEventMap()
0273     event_map.load()
0274     print(event_map)
0275 
0276     # event file name
0277     task_name = "srclabel1_srclabel2_label1_label2"
0278     job_name = "eventservice_srclabel1_srclabel2_label1_label2_0_0"
0279     event_id = [0, 1, 2, 3]
0280     event_task = event_map.get_task(task_name)
0281     event_job = event_task.get_job(job_name)
0282 
0283     # sync event status from panda
0284     # get event status from panda
0285     event_job.set_event_finished(event_id[0], reported=True)    # reported True means that we don't need to update this event to panda
0286     event_job.set_event_failed(event_id[1], reported=True)
0287     event_job.set_event_missing(event_id[2], reported=True)
0288 
0289     # check the forth event
0290     is_ok = event_job.is_ok_to_process_event(event_id[3])
0291     print("is_ok: %s" % is_ok)
0292     if is_ok:
0293         event = event_job.get_event(event_id[3])
0294         # process event
0295         print(event)
0296         # ctrl bps job
0297         print(event.gwjob)
0298         event_job.set_event_finished(event.event_index)
0299 
0300     to_report = event_job.get_events_to_report()
0301     print(to_report)
0302     # report event status to panda
0303     event_job.acknowledge_event_report(to_report)   # update the report status, to avoid reporting it again
0304     to_report = event_job.get_events_to_report()
0305     print(to_report)
0306 
0307 
0308 def test1():
0309     # gw_workflow = setup_gw_workflow()
0310     # print(json.dumps(gw_workflow))
0311     gw_workflow = setup_gw_workflow2()
0312     test_show_jobs(gw_workflow)
0313     construct_doma_jobs(gw_workflow)
0314 
0315 
0316 def test_load1():
0317     event_map = DomaEventMap()
0318     event_map.load()
0319     print(event_map)
0320 
0321     # event file name
0322     # task_name = "srclabel1_srclabel2_label1_label2"
0323     # job_name = "eventservice_srclabel1_srclabel2_label1_label2_0_0"
0324 
0325 
0326 if __name__ == '__main__':
0327     test()
0328     test_load()
0329     # test1()
0330     # test_load1()