Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Sergey Padolski, <spadolski@bnl.gov>, 2021
0010 # - Wen Guan, <wen.guan@cern.ch>, 2021
0011 
0012 
0013 """
0014 Test client.
0015 """
0016 
0017 import sys
0018 import string
0019 import random
0020 import time
0021 
0022 # import traceback
0023 
0024 # from rucio.client.client import Client as Rucio_Client
0025 # from rucio.common.exception import CannotAuthenticate
0026 
0027 # from idds.client.client import Client
0028 from idds.client.clientmanager import ClientManager
0029 # from idds.common.constants import RequestType, RequestStatus
0030 from idds.common.utils import get_rest_host
0031 # from idds.tests.common import get_example_real_tape_stagein_request
0032 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0033 
0034 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0035 # from idds.workflowv2.workflow import Condition, Workflow
0036 from idds.workflowv2.workflow import Workflow
0037 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039 
0040 
0041 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0042     site = 'in2p3'
0043     task_cloud = 'EU'
0044     # task_queue = 'CC-IN2P3_TEST'
0045     task_queue = 'CC-IN2P3_Rubin'
0046     task_queue1 = 'CC-IN2P3_Rubin_Medium'
0047     task_queue2 = 'CC-IN2P3_Rubin_Himem'
0048     task_queue3 = 'CC-IN2P3_Rubin_Big_Himem'
0049     task_queue4 = 'CC-IN2P3_Rubin_Extra_Himem'
0050     task_queue5 = 'CC-IN2P3_Rubin_Merge'
0051 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0052     site = 'lancs'
0053     task_cloud = 'EU'
0054     # task_queue = 'LANCS_TEST'
0055     task_queue = 'LANCS_Rubin'
0056     task_queue1 = 'LANCS_Rubin_Medium'
0057     task_queue2 = 'LANCS_Rubin_Himem'
0058     task_queue3 = 'LANCS_Rubin_Big_Himem'
0059     task_queue4 = 'LANCS_Rubin_Extra_Himem'
0060     # task_queue3 = 'LANCS_Rubin_Himem'
0061     task_queue5 = 'LANCS_Rubin_Merge'
0062 elif len(sys.argv) > 1 and sys.argv[1] == "ral":
0063     site = 'RAL'
0064     task_cloud = 'EU'
0065     # task_queue = 'RAL_TEST'
0066     task_queue = 'RAL_Rubin'
0067     task_queue1 = 'RAL_Rubin_Medium'
0068     task_queue2 = 'RAL_Rubin_Himem'
0069     task_queue3 = 'RAL_Rubin_Big_Himem'
0070     # task_queue3 = 'RAL_Rubin_Himem'
0071     task_queue4 = 'RAL_Rubin_Merge'
0072     # task_queue5 = 'RAL_Rubin_IO'
0073     task_queue5 = 'RAL_Rubin_Extra_Himem'
0074 else:
0075     site = 'slac'
0076     # task_cloud = 'LSST'
0077     task_cloud = 'US'
0078 
0079     task_queue = 'DOMA_LSST_GOOGLE_TEST'
0080     # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0081     # task_queue = 'SLAC_TEST'
0082     # task_queue = 'DOMA_LSST_SLAC_TEST'
0083     task_queue = 'SLAC_Rubin'
0084     task_queue1 = 'SLAC_Rubin_Medium'
0085     task_queue2 = 'SLAC_Rubin_Himem'
0086     task_queue3 = 'SLAC_Rubin_Big_Himem'
0087     task_queue4 = 'SLAC_Rubin_Extra_Himem'
0088     task_queue5 = 'SLAC_Rubin_Merge'
0089     # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
0090     # task_queue = 'SLAC_Rubin_Merge'
0091     # task_queue = 'SLAC_TEST'
0092 
0093 
0094 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0095     return ''.join(random.choice(chars) for _ in range(N))
0096 
0097 
0098 class PanDATask(object):
0099     name = None
0100     step = None
0101     dependencies = []
0102 
0103 
0104 def setup_workflow():
0105 
0106     taskN1 = PanDATask()
0107     taskN1.step = "step1"
0108     taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0109     taskN1.dependencies = [
0110         {"name": "00000" + str(k),
0111          "dependencies": [],
0112          "submitted": False} for k in range(10000)
0113     ]
0114 
0115     taskN2 = PanDATask()
0116     taskN2.step = "step2"
0117     taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0118     taskN2.dependencies = [
0119         {
0120             "name": "000010",
0121             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0122                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0123             "submitted": False
0124         },
0125         {
0126             "name": "000011",
0127             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0128                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0129             "submitted": False
0130         },
0131         {
0132             "name": "000012",
0133             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0134                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0135             "submitted": False
0136         },
0137     ]
0138 
0139     taskN2.dependencies += [
0140         {"name": "000013" + str(k),
0141          "dependencies": [],
0142          "submitted": False} for k in range(10000)
0143     ]
0144 
0145     taskN3 = PanDATask()
0146     taskN3.step = "step3"
0147     taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0148     taskN3.dependencies = [
0149         {
0150             "name": "000020",
0151             "dependencies": [],
0152             "submitted": False
0153         },
0154         {
0155             "name": "000021",
0156             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0157                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0158             "submitted": False
0159         },
0160         {
0161             "name": "000022",
0162             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0163                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0164             "submitted": False
0165         },
0166         {
0167             "name": "000023",
0168             "dependencies": [],
0169             "submitted": False
0170         },
0171         {
0172             "name": "000024",
0173             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0174                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0175             "submitted": False
0176         },
0177     ]
0178 
0179     taskN3.dependencies += [
0180         {"name": "000025" + str(k),
0181          "dependencies": [],
0182          "submitted": False} for k in range(10000)
0183     ]
0184 
0185     taskN4 = PanDATask()
0186     taskN4.step = "step4"
0187     taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0188     taskN4.dependencies = [
0189         {"name": "00004" + str(k),
0190          "dependencies": [],
0191          "submitted": False} for k in range(10000)
0192     ]
0193 
0194     taskN5 = PanDATask()
0195     taskN5.step = "step5"
0196     taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0197     taskN5.dependencies = [
0198         {"name": "00005" + str(k),
0199          "dependencies": [],
0200          "submitted": False} for k in range(100)
0201     ]
0202 
0203     taskN6 = PanDATask()
0204     taskN6.step = "step6"
0205     taskN6.name = site + "_" + taskN5.step + "_" + randStr()
0206     taskN6.dependencies = [
0207         {"name": "00005" + str(k),
0208          "dependencies": [],
0209          "submitted": False} for k in range(100)
0210     ]
0211 
0212     work1 = DomaPanDAWork(executable='echo; sleep 180',
0213                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0214                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0215                           log_collections=[], dependency_map=taskN1.dependencies,
0216                           task_name=taskN1.name, task_queue=task_queue,
0217                           encode_command_line=True,
0218                           task_priority=981,
0219                           prodSourceLabel='managed',
0220                           task_log={"dataset": "PandaJob_#{pandaid}/",
0221                                     "destination": "local",
0222                                     "param_type": "log",
0223                                     "token": "local",
0224                                     "type": "template",
0225                                     "value": "log.tgz"},
0226                           task_cloud=task_cloud)
0227     work2 = DomaPanDAWork(executable='echo; sleep 180',        # noqa F841
0228                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0229                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0230                           log_collections=[], dependency_map=taskN2.dependencies,
0231                           task_name=taskN2.name, task_queue=task_queue1,
0232                           encode_command_line=True,
0233                           task_priority=881,
0234                           prodSourceLabel='managed',
0235                           task_log={"dataset": "PandaJob_#{pandaid}/",
0236                                     "destination": "local",
0237                                     "param_type": "log",
0238                                     "token": "local",
0239                                     "type": "template",
0240                                     "value": "log.tgz"},
0241                           task_cloud=task_cloud)
0242     work3 = DomaPanDAWork(executable='echo; sleep 180',        # noqa F841
0243                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0244                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0245                           log_collections=[], dependency_map=taskN3.dependencies,
0246                           task_name=taskN3.name, task_queue=task_queue2,
0247                           encode_command_line=True,
0248                           task_priority=781,
0249                           prodSourceLabel='managed',
0250                           task_log={"dataset": "PandaJob_#{pandaid}/",
0251                                     "destination": "local",
0252                                     "param_type": "log",
0253                                     "token": "local",
0254                                     "type": "template",
0255                                     "value": "log.tgz"},
0256                           task_cloud=task_cloud)
0257 
0258     work4 = DomaPanDAWork(executable='echo; sleep 180',        # noqa F841
0259                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0260                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0261                           log_collections=[], dependency_map=taskN4.dependencies,
0262                           task_name=taskN4.name, task_queue=task_queue3,
0263                           encode_command_line=True,
0264                           task_priority=981,
0265                           prodSourceLabel='managed',
0266                           task_log={"dataset": "PandaJob_#{pandaid}/",
0267                                     "destination": "local",
0268                                     "param_type": "log",
0269                                     "token": "local",
0270                                     "type": "template",
0271                                     "value": "log.tgz"},
0272                           task_cloud=task_cloud)
0273 
0274     work5 = DomaPanDAWork(executable='echo; sleep 180',        # noqa F841
0275                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0276                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0277                           log_collections=[], dependency_map=taskN5.dependencies,
0278                           task_name=taskN5.name, task_queue=task_queue4,
0279                           encode_command_line=True,
0280                           task_priority=981,
0281                           prodSourceLabel='managed',
0282                           task_log={"dataset": "PandaJob_#{pandaid}/",
0283                                     "destination": "local",
0284                                     "param_type": "log",
0285                                     "token": "local",
0286                                     "type": "template",
0287                                     "value": "log.tgz"},
0288                           task_cloud=task_cloud)
0289 
0290     work6 = DomaPanDAWork(executable='echo; sleep 180',        # noqa F841
0291                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0292                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0293                           log_collections=[], dependency_map=taskN6.dependencies,
0294                           task_name=taskN6.name, task_queue=task_queue5,
0295                           encode_command_line=True,
0296                           task_priority=981,
0297                           prodSourceLabel='managed',
0298                           task_log={"dataset": "PandaJob_#{pandaid}/",
0299                                     "destination": "local",
0300                                     "param_type": "log",
0301                                     "token": "local",
0302                                     "type": "template",
0303                                     "value": "log.tgz"},
0304                           task_cloud=task_cloud)
0305 
0306     pending_time = 12
0307     # pending_time = None
0308     workflow = Workflow(pending_time=pending_time)
0309     workflow.add_work(work1)
0310     """
0311     workflow.add_work(work2)
0312     workflow.add_work(work3)
0313     workflow.add_work(work4)
0314     workflow.add_work(work5)
0315     workflow.add_work(work6)
0316     """
0317     workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0318     return workflow
0319 
0320 
0321 if __name__ == '__main__':
0322     host = get_rest_host()
0323     workflow = setup_workflow()
0324 
0325     wm = ClientManager(host=host)
0326     # wm.set_original_user(user_name="wguandev")
0327     request_id = wm.submit(workflow, use_dataset_name=False)
0328     print(request_id)