Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Sergey Padolski, <spadolski@bnl.gov>, 2021
0010 # - Wen Guan, <wen.guan@cern.ch>, 2021
0011 
0012 
0013 """
0014 Test client.
0015 """
0016 
0017 import sys
0018 import string
0019 import random
0020 import time
0021 
0022 # import traceback
0023 
0024 # from rucio.client.client import Client as Rucio_Client
0025 # from rucio.common.exception import CannotAuthenticate
0026 
0027 # from idds.client.client import Client
0028 from idds.client.clientmanager import ClientManager
0029 # from idds.common.constants import RequestType, RequestStatus
0030 from idds.common.utils import get_rest_host
0031 # from idds.tests.common import get_example_real_tape_stagein_request
0032 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0033 
0034 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0035 # from idds.workflowv2.workflow import Condition, Workflow
0036 from idds.workflowv2.workflow import Workflow
0037 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039 
0040 
0041 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0042     site = 'in2p3'
0043     panda_site = "CC-IN2P3"
0044     task_cloud = 'EU'
0045     # task_queue = 'CC-IN2P3_TEST'
0046     task_queue = 'CC-IN2P3_Rubin'
0047     task_queue1 = 'CC-IN2P3_Rubin_Medium'
0048     task_queue2 = 'CC-IN2P3_Rubin_Himem'
0049     task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0050     task_queue4 = 'CC-IN2P3_Rubin_Merge'
0051 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0052     site = 'lancs'
0053     panda_site = "LANCS"
0054     task_cloud = 'EU'
0055     # task_queue = 'LANCS_TEST'
0056     task_queue = 'LANCS_Rubin'
0057     task_queue1 = 'LANCS_Rubin_Medium'
0058     task_queue2 = 'LANCS_Rubin_Himem'
0059     task_queue3 = 'LANCS_Rubin_Extra_Himem'
0060     task_queue3 = 'LANCS_Rubin_Himem'
0061     task_queue4 = 'LANCS_Rubin_Merge'
0062 else:
0063     site = 'slac'
0064     panda_site = "SLAC"
0065     # task_cloud = 'LSST'
0066     task_cloud = 'US'
0067 
0068     task_queue = 'DOMA_LSST_GOOGLE_TEST'
0069     # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0070     # task_queue = 'SLAC_TEST'
0071     # task_queue = 'DOMA_LSST_SLAC_TEST'
0072     task_queue = 'SLAC_Rubin'
0073     task_queue1 = 'SLAC_Rubin_Medium'
0074     task_queue2 = 'SLAC_Rubin_Himem'
0075     task_queue3 = 'SLAC_Rubin_Extra_Himem'
0076     task_queue4 = 'SLAC_Rubin_Merge'
0077     # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
0078     # task_queue = 'SLAC_Rubin_Merge'
0079     # task_queue = 'SLAC_TEST'
0080     # task_queue4 = task_queue3 = task_queue2 = task_queue1 = task_queue
0081 
0082 # task_cloud = None
0083 
0084 
0085 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0086     return ''.join(random.choice(chars) for _ in range(N))
0087 
0088 
0089 class PanDATask(object):
0090     name = None
0091     step = None
0092     dependencies = []
0093 
0094 
0095 def setup_workflow():
0096 
0097     taskN1 = PanDATask()
0098     taskN1.step = "step1"
0099     taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0100     taskN1.dependencies = [
0101         {"name": "00000" + str(k),
0102          "order_id": k,
0103          "dependencies": [],
0104          "submitted": False} for k in range(6)
0105     ]
0106 
0107     taskN2 = PanDATask()
0108     taskN2.step = "step2"
0109     taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0110     taskN2.dependencies = [
0111         {
0112             "name": "000010",
0113             "order_id": 0,
0114             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0115                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0116             "submitted": False
0117         },
0118         {
0119             "name": "000011",
0120             "order_id": 1,
0121             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0122                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0123             "submitted": False
0124         },
0125         {
0126             "name": "000012",
0127             "order_id": 2,
0128             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0129                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0130             "submitted": False
0131         }
0132     ]
0133 
0134     taskN3 = PanDATask()
0135     taskN3.step = "step3"
0136     taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0137     taskN3.dependencies = [
0138         {
0139             "name": "000020",
0140             "order_id": 0,
0141             "dependencies": [],
0142             "submitted": False
0143         },
0144         {
0145             "name": "000021",
0146             "order_id": 1,
0147             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0148                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0149             "submitted": False
0150         },
0151         {
0152             "name": "000022",
0153             "order_id": 2,
0154             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0155                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0156             "submitted": False
0157         },
0158         {
0159             "name": "000023",
0160             "order_id": 3,
0161             "dependencies": [],
0162             "submitted": False
0163         },
0164         {
0165             "name": "000024",
0166             "order_id": 4,
0167             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0168                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0169             "submitted": False
0170         },
0171     ]
0172 
0173     taskN4 = PanDATask()
0174     taskN4.step = "step4"
0175     taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0176     taskN4.dependencies = [
0177         {"name": "00004" + str(k),
0178          "order_id": k,
0179          "dependencies": [],
0180          "submitted": False} for k in range(6)
0181     ]
0182 
0183     taskN5 = PanDATask()
0184     taskN5.step = "step5"
0185     taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0186     taskN5.dependencies = [
0187         {"name": "00005" + str(k),
0188          "order_id": k,
0189          "dependencies": [],
0190          "submitted": False} for k in range(6)
0191     ]
0192 
0193     work1 = DomaPanDAWork(executable='echo',
0194                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0195                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0196                           log_collections=[], dependency_map=taskN1.dependencies,
0197                           task_name=taskN1.name, task_queue=None, task_site=panda_site, task_rss=3000,
0198                           encode_command_line=True,
0199                           task_priority=981,
0200                           prodSourceLabel='managed',
0201                           task_log={"dataset": "PandaJob_#{pandaid}/",
0202                                     "destination": "local",
0203                                     "param_type": "log",
0204                                     "token": "local",
0205                                     "type": "template",
0206                                     "value": "log.tgz"},
0207                           task_cloud=task_cloud)
0208     work2 = DomaPanDAWork(executable='echo',
0209                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0210                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0211                           log_collections=[], dependency_map=taskN2.dependencies,
0212                           task_name=taskN2.name, task_queue=None, task_site=panda_site, task_rss=7000,
0213                           encode_command_line=True,
0214                           task_priority=881,
0215                           prodSourceLabel='managed',
0216                           task_log={"dataset": "PandaJob_#{pandaid}/",
0217                                     "destination": "local",
0218                                     "param_type": "log",
0219                                     "token": "local",
0220                                     "type": "template",
0221                                     "value": "log.tgz"},
0222                           task_cloud=task_cloud)
0223     work3 = DomaPanDAWork(executable='echo',
0224                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0225                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0226                           log_collections=[], dependency_map=taskN3.dependencies,
0227                           task_name=taskN3.name, task_queue=None, task_site=panda_site, task_rss=14000,
0228                           encode_command_line=True,
0229                           task_priority=781,
0230                           prodSourceLabel='managed',
0231                           task_log={"dataset": "PandaJob_#{pandaid}/",
0232                                     "destination": "local",
0233                                     "param_type": "log",
0234                                     "token": "local",
0235                                     "type": "template",
0236                                     "value": "log.tgz"},
0237                           task_cloud=task_cloud)
0238 
0239     work4 = DomaPanDAWork(executable='echo',
0240                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0241                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0242                           log_collections=[], dependency_map=taskN4.dependencies,
0243                           task_name=taskN4.name, task_queue=None, task_site=panda_site, task_rss=20000,
0244                           encode_command_line=True,
0245                           task_priority=981,
0246                           prodSourceLabel='managed',
0247                           task_log={"dataset": "PandaJob_#{pandaid}/",
0248                                     "destination": "local",
0249                                     "param_type": "log",
0250                                     "token": "local",
0251                                     "type": "template",
0252                                     "value": "log.tgz"},
0253                           task_cloud=task_cloud)
0254 
0255     work5 = DomaPanDAWork(executable='echo',
0256                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0257                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0258                           log_collections=[], dependency_map=taskN5.dependencies,
0259                           task_name=taskN5.name, task_queue=None, task_site=panda_site, task_rss=33000,
0260                           encode_command_line=True,
0261                           task_priority=981,
0262                           prodSourceLabel='managed',
0263                           task_log={"dataset": "PandaJob_#{pandaid}/",
0264                                     "destination": "local",
0265                                     "param_type": "log",
0266                                     "token": "local",
0267                                     "type": "template",
0268                                     "value": "log.tgz"},
0269                           task_cloud=task_cloud)
0270 
0271     pending_time = 12
0272     # pending_time = None
0273     workflow = Workflow(pending_time=pending_time)
0274     workflow.add_work(work1)
0275     workflow.add_work(work2)
0276     workflow.add_work(work3)
0277     workflow.add_work(work4)
0278     workflow.add_work(work5)
0279     workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0280     return workflow
0281 
0282 
0283 if __name__ == '__main__':
0284     host = get_rest_host()
0285     workflow = setup_workflow()
0286 
0287     wm = ClientManager(host=host)
0288     # wm.set_original_user(user_name="wguandev")
0289     request_id = wm.submit(workflow, use_dataset_name=False)
0290     print(request_id)