Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Sergey Padolski, <spadolski@bnl.gov>, 2021
0010 # - Wen Guan, <wen.guan@cern.ch>, 2021
0011 
0012 
0013 """
0014 Test client.
0015 """
0016 
0017 import sys
0018 import string
0019 import random
0020 import time
0021 
0022 # import traceback
0023 
0024 # from rucio.client.client import Client as Rucio_Client
0025 # from rucio.common.exception import CannotAuthenticate
0026 
0027 # from idds.client.client import Client
0028 from idds.client.clientmanager import ClientManager
0029 # from idds.common.constants import RequestType, RequestStatus
0030 from idds.common.utils import get_rest_host
0031 # from idds.tests.common import get_example_real_tape_stagein_request
0032 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0033 
0034 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0035 # from idds.workflowv2.workflow import Condition, Workflow
0036 from idds.workflowv2.workflow import Workflow
0037 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039 
0040 
0041 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0042     site = 'in2p3'
0043     task_cloud = 'EU'
0044     # task_queue = 'CC-IN2P3_TEST'
0045     task_queue = 'CC-IN2P3_Rubin'
0046     task_queue1 = 'CC-IN2P3_Rubin'
0047     task_queue2 = 'CC-IN2P3_Rubin'
0048     task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0049     task_queue4 = 'CC-IN2P3_Rubin_Merge'
0050     # task_queue5 = 'CC-IN2P3_Rubin_IO'
0051     task_queue5 = 'CC-IN2P3_Rubin_Extra_Himem'
0052 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0053     site = 'lancs'
0054     task_cloud = 'EU'
0055     # task_queue = 'LANCS_TEST'
0056     task_queue = 'LANCS_Rubin'
0057     task_queue1 = 'LANCS_Rubin'
0058     task_queue2 = 'LANCS_Rubin'
0059     task_queue3 = 'LANCS_Rubin_Extra_Himem'
0060     # task_queue3 = 'LANCS_Rubin_Himem'
0061     task_queue4 = 'LANCS_Rubin_Merge'
0062     # task_queue5 = 'LANCS_Rubin_IO'
0063     task_queue5 = 'LANCS_Rubin_Extra_Himem'
0064 elif len(sys.argv) > 1 and sys.argv[1] == "ral":
0065     site = 'RAL'
0066     task_cloud = 'EU'
0067     # task_queue = 'RAL_TEST'
0068     task_queue = 'RAL_Rubin'
0069     task_queue1 = 'RAL_Rubin'
0070     task_queue2 = 'RAL_Rubin'
0071     task_queue1 = task_queue
0072     task_queue2 = task_queue
0073     task_queue3 = 'RAL_Rubin_Extra_Himem'
0074     # task_queue3 = 'RAL_Rubin_Himem'
0075     task_queue4 = 'RAL_Rubin_Merge'
0076     # task_queue5 = 'RAL_Rubin_IO'
0077     task_queue5 = 'RAL_Rubin_Extra_Himem'
0078 else:
0079     site = 'slac'
0080     # task_cloud = 'LSST'
0081     task_cloud = 'US'
0082 
0083     task_queue = 'DOMA_LSST_GOOGLE_TEST'
0084     # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0085     # task_queue = 'SLAC_TEST'
0086     # task_queue = 'DOMA_LSST_SLAC_TEST'
0087     task_queue = 'SLAC_Rubin'
0088     task_queue1 = 'SLAC_Rubin'
0089     task_queue2 = 'SLAC_Rubin'
0090     task_queue3 = 'SLAC_Rubin_Extra_Himem'
0091     task_queue4 = 'SLAC_Rubin_Merge'
0092     task_queue5 = 'SLAC_Rubin'
0093     # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
0094     # task_queue = 'SLAC_Rubin_Merge'
0095     task_queue2 = 'SLAC_Rubin'
0096     # task_queue4 = task_queue3 = task_queue2 = task_queue1 = task_queue
0097 
0098 # task_cloud = None
0099 
0100 
0101 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0102     return ''.join(random.choice(chars) for _ in range(N))
0103 
0104 
0105 class PanDATask(object):
0106     name = None
0107     step = None
0108     dependencies = []
0109 
0110 
0111 def setup_workflow():
0112 
0113     taskN1 = PanDATask()
0114     taskN1.step = "step1"
0115     taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0116     taskN1.dependencies = [
0117         {"name": "00000" + str(k),
0118          "order_id": k,
0119          "dependencies": [],
0120          "submitted": False} for k in range(6)
0121     ]
0122 
0123     taskN2 = PanDATask()
0124     taskN2.step = "step2"
0125     taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0126     taskN2.dependencies = [
0127         {
0128             "name": "000010",
0129             "order_id": 0,
0130             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0131                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0132             "submitted": False
0133         },
0134         {
0135             "name": "000011",
0136             "order_id": 1,
0137             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0138                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0139             "submitted": False
0140         },
0141         {
0142             "name": "000012",
0143             "order_id": 2,
0144             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0145                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0146             "submitted": False
0147         }
0148     ]
0149 
0150     taskN3 = PanDATask()
0151     taskN3.step = "step3"
0152     taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0153     taskN3.dependencies = [
0154         {
0155             "name": "000020",
0156             "order_id": 0,
0157             "dependencies": [],
0158             "submitted": False
0159         },
0160         {
0161             "name": "000021",
0162             "order_id": 1,
0163             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0164                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0165             "submitted": False
0166         },
0167         {
0168             "name": "000022",
0169             "order_id": 2,
0170             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0171                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0172             "submitted": False
0173         },
0174         {
0175             "name": "000023",
0176             "order_id": 3,
0177             "dependencies": [],
0178             "submitted": False
0179         },
0180         {
0181             "name": "000024",
0182             "order_id": 4,
0183             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0184                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0185             "submitted": False
0186         },
0187     ]
0188 
0189     taskN4 = PanDATask()
0190     taskN4.step = "step4"
0191     taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0192     taskN4.dependencies = [
0193         {"name": "00004" + str(k),
0194          "order_id": k,
0195          "dependencies": [],
0196          "submitted": False} for k in range(6)
0197     ]
0198 
0199     taskN5 = PanDATask()
0200     taskN5.step = "step5"
0201     taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0202     taskN5.dependencies = [
0203         {"name": "00005" + str(k),
0204          "order_id": k,
0205          "dependencies": [],
0206          "submitted": False} for k in range(6)
0207     ]
0208 
0209     taskN6 = PanDATask()
0210     taskN6.step = "step6"
0211     taskN6.name = site + "_" + taskN6.step + "_" + randStr()
0212     taskN6.dependencies = [
0213         {"name": "00006" + str(k),
0214          "order_id": k,
0215          "dependencies": [],
0216          "submitted": False} for k in range(6)
0217     ]
0218 
0219     taskN7 = PanDATask()
0220     taskN7.step = "step7"
0221     taskN7.name = site + "_" + taskN7.step + "_" + randStr()
0222     taskN7.dependencies = [
0223         {"name": "00007" + str(k),
0224          "order_id": k,
0225          "dependencies": [],
0226          "submitted": False} for k in range(6)
0227     ]
0228 
0229     work1 = DomaPanDAWork(executable='echo',
0230                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0231                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0232                           log_collections=[], dependency_map=taskN1.dependencies,
0233                           task_name=taskN1.name, task_queue=task_queue,
0234                           encode_command_line=True,
0235                           task_priority=981,
0236                           prodSourceLabel='managed',
0237                           task_log={"dataset": "PandaJob_#{pandaid}/",
0238                                     "destination": "local",
0239                                     "param_type": "log",
0240                                     "token": "local",
0241                                     "type": "template",
0242                                     "value": "log.tgz"},
0243                           task_cloud=task_cloud)
0244     work2 = DomaPanDAWork(executable='echo',
0245                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0246                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0247                           log_collections=[], dependency_map=taskN2.dependencies,
0248                           task_name=taskN2.name, task_queue=task_queue1,
0249                           encode_command_line=True,
0250                           task_priority=881,
0251                           prodSourceLabel='managed',
0252                           task_log={"dataset": "PandaJob_#{pandaid}/",
0253                                     "destination": "local",
0254                                     "param_type": "log",
0255                                     "token": "local",
0256                                     "type": "template",
0257                                     "value": "log.tgz"},
0258                           task_cloud=task_cloud)
0259     work3 = DomaPanDAWork(executable='echo',
0260                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0261                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0262                           log_collections=[], dependency_map=taskN3.dependencies,
0263                           task_name=taskN3.name, task_queue=task_queue2,
0264                           encode_command_line=True,
0265                           task_priority=781,
0266                           prodSourceLabel='managed',
0267                           task_log={"dataset": "PandaJob_#{pandaid}/",
0268                                     "destination": "local",
0269                                     "param_type": "log",
0270                                     "token": "local",
0271                                     "type": "template",
0272                                     "value": "log.tgz"},
0273                           task_cloud=task_cloud)
0274 
0275     work4 = DomaPanDAWork(executable='echo',
0276                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0277                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0278                           log_collections=[], dependency_map=taskN4.dependencies,
0279                           task_name=taskN4.name, task_queue=task_queue3,
0280                           encode_command_line=True,
0281                           task_priority=981,
0282                           core_count=2,
0283                           task_rss=32000,
0284                           prodSourceLabel='managed',
0285                           task_log={"dataset": "PandaJob_#{pandaid}/",
0286                                     "destination": "local",
0287                                     "param_type": "log",
0288                                     "token": "local",
0289                                     "type": "template",
0290                                     "value": "log.tgz"},
0291                           task_cloud=task_cloud)
0292 
0293     work5 = DomaPanDAWork(executable='echo',
0294                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0295                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0296                           log_collections=[], dependency_map=taskN5.dependencies,
0297                           task_name=taskN5.name, task_queue=task_queue4,
0298                           encode_command_line=True,
0299                           task_priority=981,
0300                           prodSourceLabel='managed',
0301                           task_log={"dataset": "PandaJob_#{pandaid}/",
0302                                     "destination": "local",
0303                                     "param_type": "log",
0304                                     "token": "local",
0305                                     "type": "template",
0306                                     "value": "log.tgz"},
0307                           task_cloud=task_cloud)
0308 
0309     work6 = DomaPanDAWork(executable='echo',
0310                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0311                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0312                           log_collections=[], dependency_map=taskN6.dependencies,
0313                           task_name=taskN6.name, task_queue=task_queue5,
0314                           encode_command_line=True,
0315                           task_priority=981,
0316                           prodSourceLabel='managed',
0317                           task_log={"dataset": "PandaJob_#{pandaid}/",
0318                                     "destination": "local",
0319                                     "param_type": "log",
0320                                     "token": "local",
0321                                     "type": "template",
0322                                     "value": "log.tgz"},
0323                           task_cloud=task_cloud)
0324 
0325     work7 = DomaPanDAWork(executable='echo',
0326                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0327                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0328                           log_collections=[], dependency_map=taskN7.dependencies,
0329                           task_name=taskN7.name, task_queue=task_queue3,
0330                           encode_command_line=True,
0331                           task_priority=981,
0332                           core_count=2,
0333                           prodSourceLabel='managed',
0334                           task_log={"dataset": "PandaJob_#{pandaid}/",
0335                                     "destination": "local",
0336                                     "param_type": "log",
0337                                     "token": "local",
0338                                     "type": "template",
0339                                     "value": "log.tgz"},
0340                           task_cloud=task_cloud)
0341 
0342     pending_time = 12
0343     # pending_time = None
0344     workflow = Workflow(pending_time=pending_time)
0345     workflow.add_work(work1)
0346     workflow.add_work(work2)
0347     workflow.add_work(work3)
0348     workflow.add_work(work4)
0349     workflow.add_work(work5)
0350     workflow.add_work(work6)
0351     workflow.add_work(work7)
0352     workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0353     return workflow
0354 
0355 
0356 if __name__ == '__main__':
0357     host = get_rest_host()
0358     workflow = setup_workflow()
0359 
0360     wm = ClientManager(host=host)
0361     # wm.set_original_user(user_name="wguandev")
0362     request_id = wm.submit(workflow, use_dataset_name=False)
0363     print(request_id)