Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Sergey Padolski, <spadolski@bnl.gov>, 2021
0010 # - Wen Guan, <wen.guan@cern.ch>, 2021
0011 
0012 
0013 """
0014 Test client.
0015 """
0016 
0017 import string
0018 import random
0019 import time
0020 
0021 # import traceback
0022 
0023 # from rucio.client.client import Client as Rucio_Client
0024 # from rucio.common.exception import CannotAuthenticate
0025 
0026 # from idds.client.client import Client
0027 # from idds.client.clientmanager import ClientManager
0028 # from idds.common.constants import RequestType, RequestStatus
0029 from idds.common.utils import get_rest_host
0030 # from idds.tests.common import get_example_real_tape_stagein_request
0031 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0032 
0033 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0034 # from idds.workflowv2.workflow import Condition, Workflow
0035 from idds.workflowv2.workflow import Workflow
0036 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0037 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0038 
0039 import idds.common.utils as idds_utils
0040 import pandaclient.idds_api
0041 
0042 
0043 # task_queue = 'DOMA_LSST_GOOGLE_TEST'
0044 # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0045 
0046 task_cloud = 'US'
0047 task_queue = "SLAC_Rubin"
0048 task_queue = 'BNL_OSG_2'
0049 
0050 
0051 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0052     return ''.join(random.choice(chars) for _ in range(N))
0053 
0054 
0055 class PanDATask(object):
0056     name = None
0057     step = None
0058     dependencies = []
0059 
0060 
0061 def setup_workflow():
0062 
0063     taskN1 = PanDATask()
0064     taskN1.step = "step1"
0065     taskN1.name = taskN1.step + "_" + randStr()
0066     taskN1.dependencies = [
0067         {"name": "00000" + str(k),
0068          "dependencies": [],
0069          "submitted": False} for k in range(6)
0070     ]
0071 
0072     taskN2 = PanDATask()
0073     taskN2.step = "step2"
0074     taskN2.name = taskN2.step + "_" + randStr()
0075     taskN2.dependencies = [
0076         {
0077             "name": "000010",
0078             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0080             "submitted": False
0081         },
0082         {
0083             "name": "000011",
0084             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0086             "submitted": False
0087         },
0088         {
0089             "name": "000012",
0090             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0091                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0092             "submitted": False
0093         }
0094     ]
0095 
0096     taskN3 = PanDATask()
0097     taskN3.step = "step3"
0098     taskN3.name = taskN3.step + "_" + randStr()
0099     taskN3.dependencies = [
0100         {
0101             "name": "000020",
0102             "dependencies": [],
0103             "submitted": False
0104         },
0105         {
0106             "name": "000021",
0107             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0108                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0109             "submitted": False
0110         },
0111         {
0112             "name": "000022",
0113             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0114                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0115             "submitted": False
0116         },
0117         {
0118             "name": "000023",
0119             "dependencies": [],
0120             "submitted": False
0121         },
0122         {
0123             "name": "000024",
0124             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0125                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0126             "submitted": False
0127         },
0128     ]
0129 
0130     work1 = DomaPanDAWork(executable='echo',
0131                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0132                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0133                           log_collections=[], dependency_map=taskN1.dependencies,
0134                           task_name=taskN1.name, task_queue=task_queue,
0135                           encode_command_line=True,
0136                           task_log={"dataset": "PandaJob_#{pandaid}/",
0137                                     "destination": "local",
0138                                     "param_type": "log",
0139                                     "token": "local",
0140                                     "type": "template",
0141                                     "value": "log.tgz"},
0142                           task_cloud=task_cloud)
0143     work2 = DomaPanDAWork(executable='echo',
0144                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0145                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0146                           log_collections=[], dependency_map=taskN2.dependencies,
0147                           task_name=taskN2.name, task_queue=task_queue,
0148                           task_log={"dataset": "PandaJob_#{pandaid}/",
0149                                     "destination": "local",
0150                                     "param_type": "log",
0151                                     "token": "local",
0152                                     "type": "template",
0153                                     "value": "log.tgz"},
0154                           encode_command_line=True,
0155                           task_cloud=task_cloud)
0156     work3 = DomaPanDAWork(executable='echo',
0157                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0158                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0159                           log_collections=[], dependency_map=taskN3.dependencies,
0160                           task_name=taskN3.name, task_queue=task_queue,
0161                           task_log={"dataset": "PandaJob_#{pandaid}/",
0162                                     "destination": "local",
0163                                     "param_type": "log",
0164                                     "token": "local",
0165                                     "type": "template",
0166                                     "value": "log.tgz"},
0167                           task_cloud=task_cloud,
0168                           encode_command_line=True)
0169 
0170     pending_time = 12
0171     # pending_time = None
0172     workflow = Workflow(pending_time=pending_time)
0173     workflow.add_work(work1)
0174     workflow.add_work(work2)
0175     workflow.add_work(work3)
0176     workflow.name = 'test_workflow.idds.%s' % time.time()
0177     return workflow
0178 
0179 
0180 def submit(workflow, idds_server):
0181 
0182     idds_server = None
0183     c = pandaclient.idds_api.get_api(idds_utils.json_dumps, verbose=True,
0184                                      idds_host=idds_server, compress=True, manager=True)
0185     request_id = c.submit(workflow, username=None, use_dataset_name=False)
0186     print("Submitted into iDDs with request id=%s", str(request_id))
0187 
0188 
0189 if __name__ == '__main__':
0190     host = get_rest_host()
0191     workflow = setup_workflow()
0192 
0193     # wm = ClientManager(host=host)
0194     # request_id = wm.submit(workflow)
0195     # print(request_id)
0196     submit(workflow, host)