Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023
0010 
0011 
0012 """
0013 Test client.
0014 """
0015 
0016 import sys
0017 import string
0018 import random
0019 import time
0020 
0021 # import traceback
0022 
0023 # from rucio.client.client import Client as Rucio_Client
0024 # from rucio.common.exception import CannotAuthenticate
0025 
0026 # from idds.client.client import Client
0027 from idds.client.clientmanager import ClientManager
0028 # from idds.common.constants import RequestType, RequestStatus
0029 from idds.common.utils import get_rest_host
0030 # from idds.tests.common import get_example_real_tape_stagein_request
0031 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0032 
0033 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0034 # from idds.workflowv2.workflow import Condition, Workflow
0035 from idds.workflowv2.workflow import Workflow
0036 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0037 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0038 
0039 
0040 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0041     site = 'in2p3'
0042     task_cloud = 'EU'
0043     # task_queue = 'CC-IN2P3_TEST'
0044     task_queue = 'CC-IN2P3_Rubin'
0045     task_queue1 = 'CC-IN2P3_Rubin_Medium'
0046     task_queue2 = 'CC-IN2P3_Rubin_Himem'
0047     task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0048     task_queue4 = 'CC-IN2P3_Rubin_Merge'
0049 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0050     site = 'lancs'
0051     task_cloud = 'EU'
0052     # task_queue = 'LANCS_TEST'
0053     task_queue = 'LANCS_Rubin'
0054     task_queue1 = 'LANCS_Rubin_Medium'
0055     task_queue2 = 'LANCS_Rubin_Himem'
0056     task_queue3 = 'LANCS_Rubin_Extra_Himem'
0057     task_queue3 = 'LANCS_Rubin_Himem'
0058     task_queue4 = 'LANCS_Rubin_Merge'
0059 else:
0060     site = 'slac'
0061     # task_cloud = 'LSST'
0062     task_cloud = 'US'
0063 
0064     task_queue = 'DOMA_LSST_GOOGLE_TEST'
0065     # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0066     # task_queue = 'SLAC_TEST'
0067     # task_queue = 'DOMA_LSST_SLAC_TEST'
0068     task_queue = 'SLAC_Rubin'
0069     task_queue1 = 'SLAC_Rubin_Medium'
0070     task_queue2 = 'SLAC_Rubin_Himem'
0071     task_queue3 = 'SLAC_Rubin_Extra_Himem'
0072     task_queue4 = 'SLAC_Rubin_Merge'
0073     # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
0074     # task_queue = 'SLAC_Rubin_Merge'
0075     # task_queue = 'SLAC_TEST'
0076 
0077 
0078 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0079     return ''.join(random.choice(chars) for _ in range(N))
0080 
0081 
0082 class PanDATask(object):
0083     name = None
0084     step = None
0085     dependencies = []
0086 
0087 
0088 def setup_workflow():
0089 
0090     taskN1 = PanDATask()
0091     taskN1.step = "step1"
0092     taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0093     taskN1.dependencies = [
0094         {"name": "00000" + str(k),
0095          "dependencies": [],
0096          "submitted": False} for k in range(6)
0097     ]
0098 
0099     taskN2 = PanDATask()
0100     taskN2.step = "step2"
0101     taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0102     taskN2.dependencies = [
0103         {
0104             "name": "000010",
0105             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0106                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0107             "submitted": False
0108         },
0109         {
0110             "name": "000011",
0111             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0112                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0113             "submitted": False
0114         },
0115         {
0116             "name": "000012",
0117             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0118                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0119             "submitted": False
0120         }
0121     ]
0122 
0123     taskN3 = PanDATask()
0124     taskN3.step = "step3"
0125     taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0126     taskN3.dependencies = [
0127         {
0128             "name": "000020",
0129             "dependencies": [],
0130             "submitted": False
0131         },
0132         {
0133             "name": "000021",
0134             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0135                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0136             "submitted": False
0137         },
0138         {
0139             "name": "000022",
0140             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0141                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0142             "submitted": False
0143         },
0144         {
0145             "name": "000023",
0146             "dependencies": [],
0147             "submitted": False
0148         },
0149         {
0150             "name": "000024",
0151             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0152                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0153             "submitted": False
0154         },
0155     ]
0156 
0157     taskN4 = PanDATask()
0158     taskN4.step = "step4"
0159     taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0160     taskN4.dependencies = [
0161         {"name": "00004" * 1000 + str(k),
0162          "dependencies": [],
0163          "submitted": False} for k in range(6)
0164     ]
0165 
0166     taskN5 = PanDATask()
0167     taskN5.step = "step5"
0168     taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0169     taskN5.dependencies = [
0170         {"name": "00005" + str(k),
0171          "dependencies": [],
0172          "submitted": False} for k in range(6)
0173     ]
0174 
0175     work1 = DomaPanDAWork(executable='echo',
0176                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0177                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0178                           log_collections=[], dependency_map=taskN1.dependencies,
0179                           task_name=taskN1.name, task_queue=task_queue,
0180                           encode_command_line=True,
0181                           task_priority=981,
0182                           prodSourceLabel='managed',
0183                           task_log={"dataset": "PandaJob_#{pandaid}/",
0184                                     "destination": "local",
0185                                     "param_type": "log",
0186                                     "token": "local",
0187                                     "type": "template",
0188                                     "value": "log.tgz"},
0189                           task_cloud=task_cloud)
0190     work2 = DomaPanDAWork(executable='echo',
0191                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0192                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0193                           log_collections=[], dependency_map=taskN2.dependencies,
0194                           task_name=taskN2.name, task_queue=task_queue1,
0195                           encode_command_line=True,
0196                           task_priority=881,
0197                           prodSourceLabel='managed',
0198                           task_log={"dataset": "PandaJob_#{pandaid}/",
0199                                     "destination": "local",
0200                                     "param_type": "log",
0201                                     "token": "local",
0202                                     "type": "template",
0203                                     "value": "log.tgz"},
0204                           task_cloud=task_cloud)
0205     work3 = DomaPanDAWork(executable='echo',
0206                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0207                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0208                           log_collections=[], dependency_map=taskN3.dependencies,
0209                           task_name=taskN3.name, task_queue=task_queue2,
0210                           encode_command_line=True,
0211                           task_priority=781,
0212                           prodSourceLabel='managed',
0213                           task_log={"dataset": "PandaJob_#{pandaid}/",
0214                                     "destination": "local",
0215                                     "param_type": "log",
0216                                     "token": "local",
0217                                     "type": "template",
0218                                     "value": "log.tgz"},
0219                           task_cloud=task_cloud)
0220 
0221     work4 = DomaPanDAWork(executable='echo',
0222                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0223                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0224                           log_collections=[], dependency_map=taskN4.dependencies,
0225                           task_name=taskN4.name, task_queue=task_queue3,
0226                           encode_command_line=True,
0227                           task_priority=981,
0228                           prodSourceLabel='managed',
0229                           task_log={"dataset": "PandaJob_#{pandaid}/",
0230                                     "destination": "local",
0231                                     "param_type": "log",
0232                                     "token": "local",
0233                                     "type": "template",
0234                                     "value": "log.tgz"},
0235                           task_cloud=task_cloud)
0236 
0237     work5 = DomaPanDAWork(executable='echo',
0238                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0239                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0240                           log_collections=[], dependency_map=taskN5.dependencies,
0241                           task_name=taskN5.name, task_queue=task_queue4,
0242                           encode_command_line=True,
0243                           task_priority=981,
0244                           prodSourceLabel='managed',
0245                           task_log={"dataset": "PandaJob_#{pandaid}/",
0246                                     "destination": "local",
0247                                     "param_type": "log",
0248                                     "token": "local",
0249                                     "type": "template",
0250                                     "value": "log.tgz"},
0251                           task_cloud=task_cloud)
0252 
0253     pending_time = 12
0254     # pending_time = None
0255     workflow = Workflow(pending_time=pending_time)
0256     workflow.add_work(work1)
0257     workflow.add_work(work2)
0258     workflow.add_work(work3)
0259     workflow.add_work(work4)
0260     workflow.add_work(work5)
0261     workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0262     return workflow
0263 
0264 
0265 if __name__ == '__main__':
0266     host = get_rest_host()
0267     workflow = setup_workflow()
0268 
0269     wm = ClientManager(host=host)
0270     # wm.set_original_user(user_name="wguandev")
0271     request_id = wm.submit(workflow, use_dataset_name=False)
0272     print(request_id)