Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Sergey Padolski, <spadolski@bnl.gov>, 2021
0010 # - Wen Guan, <wen.guan@cern.ch>, 2021
0011 
0012 
0013 """
0014 Test client.
0015 """
0016 
0017 import logging
0018 import string
0019 import random
0020 
0021 logging.basicConfig(level=logging.DEBUG)
0022 
0023 # import traceback
0024 
0025 # from rucio.client.client import Client as Rucio_Client
0026 # from rucio.common.exception import CannotAuthenticate
0027 
0028 # from idds.client.client import Client
0029 from idds.client.clientmanager import ClientManager   # noqa E402
0030 # from idds.common.constants import RequestType, RequestStatus
0031 # from idds.common.utils import get_rest_host
0032 # from idds.tests.common import get_example_real_tape_stagein_request
0033 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0034 
0035 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0036 # from idds.workflowv2.workflow import Condition, Workflow
0037 from idds.workflowv2.workflow import Workflow, Condition     # noqa E402
0038 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0039 from idds.doma.workflowv2.domapandawork import DomaPanDAWork   # noqa E402
0040 
0041 
0042 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0043 
0044 
0045 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0046     return ''.join(random.choice(chars) for _ in range(N))
0047 
0048 
0049 class PanDATask(object):
0050     name = None
0051     step = None
0052     dependencies = []
0053 
0054 
0055 def setup_workflow():
0056 
0057     taskN1 = PanDATask()
0058     taskN1.step = "step1"
0059     taskN1.name = taskN1.step + "_" + randStr()
0060     taskN1.dependencies = [
0061         {"name": "00000" + str(k),
0062          "dependencies": [],
0063          "submitted": False} for k in range(6)
0064     ]
0065 
0066     taskN2 = PanDATask()
0067     taskN2.step = "step2"
0068     taskN2.name = taskN2.step + "_" + randStr()
0069     taskN2.dependencies = [
0070         {
0071             "name": "000010",
0072             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0073                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0074             "submitted": False
0075         },
0076         {
0077             "name": "000011",
0078             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0080             "submitted": False
0081         },
0082         {
0083             "name": "000012",
0084             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0086             "submitted": False
0087         }
0088     ]
0089 
0090     taskN3 = PanDATask()
0091     taskN3.step = "step3"
0092     taskN3.name = taskN3.step + "_" + randStr()
0093     taskN3.dependencies = [
0094         {
0095             "name": "000020",
0096             "dependencies": [],
0097             "submitted": False
0098         },
0099         {
0100             "name": "000021",
0101             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0102                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0103             "submitted": False
0104         },
0105         {
0106             "name": "000022",
0107             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0108                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0109             "submitted": False
0110         },
0111         {
0112             "name": "000023",
0113             "dependencies": [],
0114             "submitted": False
0115         },
0116         {
0117             "name": "000024",
0118             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0119                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0120             "submitted": False
0121         },
0122     ]
0123 
0124     work1 = DomaPanDAWork(executable='echo',
0125                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0126                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0127                           log_collections=[], dependency_map=taskN1.dependencies,
0128                           task_name=taskN1.name, task_queue=task_queue,
0129                           encode_command_line=True,
0130                           task_log={"dataset": "PandaJob_#{pandaid}/",
0131                                     "destination": "local",
0132                                     "param_type": "log",
0133                                     "token": "local",
0134                                     "type": "template",
0135                                     "value": "log.tgz"},
0136                           task_cloud='LSST',
0137                           task_priority=None)
0138     work2 = DomaPanDAWork(executable='echo',
0139                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0140                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0141                           log_collections=[], dependency_map=taskN2.dependencies,
0142                           task_name=taskN2.name, task_queue=task_queue,
0143                           encode_command_line=True,
0144                           task_log={"dataset": "PandaJob_#{pandaid}/",
0145                                     "destination": "local",
0146                                     "param_type": "log",
0147                                     "token": "local",
0148                                     "type": "template",
0149                                     "value": "log.tgz"},
0150                           task_cloud='LSST')
0151     work3 = DomaPanDAWork(executable='echo',
0152                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0153                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0154                           log_collections=[], dependency_map=taskN3.dependencies,
0155                           task_name=taskN3.name, task_queue=task_queue,
0156                           encode_command_line=True,
0157                           task_log={"dataset": "PandaJob_#{pandaid}/",
0158                                     "destination": "local",
0159                                     "param_type": "log",
0160                                     "token": "local",
0161                                     "type": "template",
0162                                     "value": "log.tgz"},
0163                           task_cloud='LSST')
0164 
0165     cond1 = Condition(cond=work1.is_finished, true_work=work2)
0166     cond2 = Condition(cond=work2.is_finished, true_work=work3)
0167 
0168     pending_time = 12
0169     # pending_time = None
0170     workflow = Workflow(pending_time=pending_time)
0171     workflow.add_work(work1)
0172     workflow.add_work(work2)
0173     workflow.add_work(work3)
0174     workflow.add_condition(cond1)
0175     workflow.add_condition(cond2)
0176     return workflow
0177 
0178 
0179 if __name__ == '__main__':
0180     # host = get_rest_host()
0181     workflow = setup_workflow()
0182 
0183     wm = ClientManager(host=None)
0184     request_id = wm.submit(workflow)
0185     print(request_id)