Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023
0010 
0011 
0012 """
0013 Test client.
0014 """
0015 
0016 import sys
0017 import string
0018 import random
0019 import time
0020 
0021 # import traceback
0022 
0023 # from rucio.client.client import Client as Rucio_Client
0024 # from rucio.common.exception import CannotAuthenticate
0025 
0026 # from idds.client.client import Client
0027 from idds.client.clientmanager import ClientManager
0028 # from idds.common.constants import RequestType, RequestStatus
0029 from idds.common.utils import get_rest_host
0030 # from idds.tests.common import get_example_real_tape_stagein_request
0031 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0032 
0033 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0034 # from idds.workflowv2.workflow import Condition, Workflow
0035 from idds.workflowv2.workflow import Workflow
0036 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0037 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0038 
0039 
0040 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0041     site = 'in2p3'
0042     task_cloud = 'EU'
0043     # task_queue = 'CC-IN2P3_TEST'
0044     task_queue = 'CC-IN2P3_Rubin'
0045     task_queue1 = 'CC-IN2P3_Rubin_Medium'
0046     task_queue2 = 'CC-IN2P3_Rubin_Himem'
0047     task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0048     task_queue4 = 'CC-IN2P3_Rubin_Merge'
0049 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0050     site = 'lancs'
0051     task_cloud = 'EU'
0052     # task_queue = 'LANCS_TEST'
0053     task_queue = 'LANCS_Rubin'
0054     task_queue1 = 'LANCS_Rubin_Medium'
0055     task_queue2 = 'LANCS_Rubin_Himem'
0056     task_queue3 = 'LANCS_Rubin_Extra_Himem'
0057     task_queue3 = 'LANCS_Rubin_Himem'
0058     task_queue4 = 'LANCS_Rubin_Merge'
0059 else:
0060     site = 'slac'
0061     # task_cloud = 'LSST'
0062     task_cloud = 'US'
0063 
0064     task_queue = 'DOMA_LSST_GOOGLE_TEST'
0065     # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0066     # task_queue = 'SLAC_TEST'
0067     # task_queue = 'DOMA_LSST_SLAC_TEST'
0068     task_queue = 'SLAC_Rubin'
0069     task_queue1 = 'SLAC_Rubin_Medium'
0070     task_queue2 = 'SLAC_Rubin_Himem'
0071     task_queue3 = 'SLAC_Rubin_Extra_Himem'
0072     task_queue4 = 'SLAC_Rubin_Merge'
0073     # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
0074     # task_queue = 'SLAC_Rubin_Merge'
0075     # task_queue = 'SLAC_TEST'
0076 
0077 # task_cloud = None
0078 
0079 
0080 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0081     return ''.join(random.choice(chars) for _ in range(N))
0082 
0083 
0084 class PanDATask(object):
0085     name = None
0086     step = None
0087     dependencies = []
0088 
0089 
0090 def setup_workflow():
0091 
0092     taskN1 = PanDATask()
0093     taskN1.step = "step1"
0094     taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0095     taskN1.dependencies = [
0096         {"name": "00000" + str(k),
0097          "dependencies": [],
0098          "submitted": False} for k in range(6)
0099     ]
0100 
0101     taskN2 = PanDATask()
0102     taskN2.step = "step2"
0103     taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0104     taskN2.dependencies = [
0105         {
0106             "name": "000010",
0107             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0108                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0109             "submitted": False
0110         },
0111         {
0112             "name": "000011",
0113             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0114                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0115             "submitted": False
0116         },
0117         {
0118             "name": "000012",
0119             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0120                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0121             "submitted": False
0122         }
0123     ]
0124 
0125     taskN3 = PanDATask()
0126     taskN3.step = "step3"
0127     taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0128     taskN3.dependencies = [
0129         {
0130             "name": "000020",
0131             "dependencies": [],
0132             "submitted": False
0133         },
0134         {
0135             "name": "000021",
0136             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0137                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0138             "submitted": False
0139         },
0140         {
0141             "name": "000022",
0142             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0143                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0144             "submitted": False
0145         },
0146         {
0147             "name": "000023",
0148             "dependencies": [],
0149             "submitted": False
0150         },
0151         {
0152             "name": "000024",
0153             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0154                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0155             "submitted": False
0156         },
0157     ]
0158 
0159     taskN4 = PanDATask()
0160     taskN4.step = "step4"
0161     taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0162     taskN4.dependencies = [
0163         {"name": "00004" + str(k),
0164          "dependencies": [],
0165          "submitted": False} for k in range(6)
0166     ]
0167 
0168     taskN5 = PanDATask()
0169     taskN5.step = "step5"
0170     taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0171     taskN5.dependencies = [
0172         {"name": "00005" + str(k),
0173          "dependencies": [],
0174          "submitted": False} for k in range(6)
0175     ]
0176 
0177     work1 = DomaPanDAWork(executable='echo',
0178                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0179                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0180                           log_collections=[], dependency_map=taskN1.dependencies,
0181                           task_name=taskN1.name, task_queue=task_queue,
0182                           encode_command_line=True,
0183                           task_priority=981,
0184                           task_rss=3, task_rss_retry_offset=3, task_rss_retry_step=1000, task_rss_max=8000,
0185                           prodSourceLabel='managed',
0186                           task_log={"dataset": "PandaJob_#{pandaid}/",
0187                                     "destination": "local",
0188                                     "param_type": "log",
0189                                     "token": "local",
0190                                     "type": "template",
0191                                     "value": "log.tgz"},
0192                           task_cloud=task_cloud)
0193     work2 = DomaPanDAWork(executable='echo',
0194                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0195                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0196                           log_collections=[], dependency_map=taskN2.dependencies,
0197                           task_name=taskN2.name, task_queue=task_queue1,
0198                           encode_command_line=True,
0199                           task_priority=881,
0200                           task_rss=3, task_rss_retry_offset=3, task_rss_retry_step=1000, task_rss_max=8000,
0201                           prodSourceLabel='managed',
0202                           task_log={"dataset": "PandaJob_#{pandaid}/",
0203                                     "destination": "local",
0204                                     "param_type": "log",
0205                                     "token": "local",
0206                                     "type": "template",
0207                                     "value": "log.tgz"},
0208                           task_cloud=task_cloud)
0209     work3 = DomaPanDAWork(executable='echo',
0210                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0211                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0212                           log_collections=[], dependency_map=taskN3.dependencies,
0213                           task_name=taskN3.name, task_queue=task_queue2,
0214                           encode_command_line=True,
0215                           task_priority=781,
0216                           task_rss=3, task_rss_retry_offset=3, task_rss_retry_step=1000, task_rss_max=8000,
0217                           prodSourceLabel='managed',
0218                           task_log={"dataset": "PandaJob_#{pandaid}/",
0219                                     "destination": "local",
0220                                     "param_type": "log",
0221                                     "token": "local",
0222                                     "type": "template",
0223                                     "value": "log.tgz"},
0224                           task_cloud=task_cloud)
0225 
0226     work4 = DomaPanDAWork(executable='echo',
0227                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0228                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0229                           log_collections=[], dependency_map=taskN4.dependencies,
0230                           task_name=taskN4.name, task_queue=task_queue3,
0231                           encode_command_line=True,
0232                           task_priority=981,
0233                           task_rss=3, task_rss_retry_offset=3, task_rss_retry_step=1000, task_rss_max=8000,
0234                           prodSourceLabel='managed',
0235                           task_log={"dataset": "PandaJob_#{pandaid}/",
0236                                     "destination": "local",
0237                                     "param_type": "log",
0238                                     "token": "local",
0239                                     "type": "template",
0240                                     "value": "log.tgz"},
0241                           task_cloud=task_cloud)
0242 
0243     work5 = DomaPanDAWork(executable='echo',
0244                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0245                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0246                           log_collections=[], dependency_map=taskN5.dependencies,
0247                           task_name=taskN5.name, task_queue=task_queue4,
0248                           encode_command_line=True,
0249                           task_priority=981,
0250                           task_rss=3, task_rss_retry_offset=3, task_rss_retry_step=1000, task_rss_max=8000,
0251                           prodSourceLabel='managed',
0252                           task_log={"dataset": "PandaJob_#{pandaid}/",
0253                                     "destination": "local",
0254                                     "param_type": "log",
0255                                     "token": "local",
0256                                     "type": "template",
0257                                     "value": "log.tgz"},
0258                           task_cloud=task_cloud)
0259 
0260     pending_time = 12
0261     # pending_time = None
0262     workflow = Workflow(pending_time=pending_time)
0263     workflow.add_work(work1)
0264     workflow.add_work(work2)
0265     workflow.add_work(work3)
0266     workflow.add_work(work4)
0267     workflow.add_work(work5)
0268     workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0269     return workflow
0270 
0271 
0272 if __name__ == '__main__':
0273     host = get_rest_host()
0274     workflow = setup_workflow()
0275 
0276     wm = ClientManager(host=host)
0277     # wm.set_original_user(user_name="wguandev")
0278     request_id = wm.submit(workflow, use_dataset_name=False)
0279     print(request_id)