Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2022 - 2023
0010 
0011 
0012 """
0013 Test client.
0014 """
0015 
0016 import os
0017 import sys
0018 import string
0019 import random
0020 import time
0021 
0022 # import traceback
0023 
0024 # from rucio.client.client import Client as Rucio_Client
0025 # from rucio.common.exception import CannotAuthenticate
0026 
0027 # from idds.client.client import Client
0028 from idds.client.clientmanager import ClientManager
0029 # from idds.common.constants import RequestType, RequestStatus
0030 # from idds.common.utils import get_rest_host
0031 # from idds.tests.common import get_example_real_tape_stagein_request
0032 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0033 
0034 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0035 # from idds.workflowv2.workflow import Condition, Workflow
0036 from idds.workflowv2.workflow import Workflow
0037 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039 
0040 # task_cloud = 'LSST'
0041 task_cloud = 'US'
0042 
0043 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0044 # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0045 # task_queue = 'SLAC_TEST'
0046 # task_queue = 'DOMA_LSST_SLAC_TEST'
0047 task_queue = 'SLAC_Rubin'
0048 # task_queue = 'CC-IN2P3_TEST'
0049 
0050 
0051 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0052     return ''.join(random.choice(chars) for _ in range(N))
0053 
0054 
0055 class PanDATask(object):
0056     name = None
0057     step = None
0058     dependencies = []
0059 
0060 
0061 def setup_workflow():
0062 
0063     taskN1 = PanDATask()
0064     taskN1.step = "step1"
0065     taskN1.name = taskN1.step + "_" + randStr()
0066     taskN1.dependencies = [
0067         {"name": "00000" + str(k),
0068          "dependencies": [],
0069          "submitted": False} for k in range(6)
0070     ]
0071 
0072     taskN2 = PanDATask()
0073     taskN2.step = "step2"
0074     taskN2.name = taskN2.step + "_" + randStr()
0075     taskN2.dependencies = [
0076         {
0077             "name": "000010",
0078             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0079                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0080             "submitted": False
0081         },
0082         {
0083             "name": "000011",
0084             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0085                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0086             "submitted": False
0087         },
0088         {
0089             "name": "000012",
0090             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0091                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0092             "submitted": False
0093         }
0094     ]
0095 
0096     taskN3 = PanDATask()
0097     taskN3.step = "step3"
0098     taskN3.name = taskN3.step + "_" + randStr()
0099     taskN3.dependencies = [
0100         {
0101             "name": "000020",
0102             "dependencies": [],
0103             "submitted": False
0104         },
0105         {
0106             "name": "000021",
0107             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0108                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0109             "submitted": False
0110         },
0111         {
0112             "name": "000022",
0113             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0114                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0115             "submitted": False
0116         },
0117         {
0118             "name": "000023",
0119             "dependencies": [],
0120             "submitted": False
0121         },
0122         {
0123             "name": "000024",
0124             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0125                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0126             "submitted": False
0127         },
0128     ]
0129 
0130     work1 = DomaPanDAWork(executable='echo',
0131                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0132                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0133                           log_collections=[], dependency_map=taskN1.dependencies,
0134                           task_name=taskN1.name, task_queue=task_queue,
0135                           encode_command_line=True,
0136                           prodSourceLabel='managed',
0137                           task_log={"dataset": "PandaJob_#{pandaid}/",
0138                                     "destination": "local",
0139                                     "param_type": "log",
0140                                     "token": "local",
0141                                     "type": "template",
0142                                     "value": "log.tgz"},
0143                           task_cloud=task_cloud)
0144     work2 = DomaPanDAWork(executable='echo',
0145                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0146                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0147                           log_collections=[], dependency_map=taskN2.dependencies,
0148                           task_name=taskN2.name, task_queue=task_queue,
0149                           encode_command_line=True,
0150                           prodSourceLabel='managed',
0151                           task_log={"dataset": "PandaJob_#{pandaid}/",
0152                                     "destination": "local",
0153                                     "param_type": "log",
0154                                     "token": "local",
0155                                     "type": "template",
0156                                     "value": "log.tgz"},
0157                           task_cloud=task_cloud)
0158     work3 = DomaPanDAWork(executable='echo',
0159                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0160                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0161                           log_collections=[], dependency_map=taskN3.dependencies,
0162                           task_name=taskN3.name, task_queue=task_queue,
0163                           encode_command_line=True,
0164                           prodSourceLabel='managed',
0165                           task_log={"dataset": "PandaJob_#{pandaid}/",
0166                                     "destination": "local",
0167                                     "param_type": "log",
0168                                     "token": "local",
0169                                     "type": "template",
0170                                     "value": "log.tgz"},
0171                           task_cloud=task_cloud)
0172 
0173     pending_time = 12
0174     # pending_time = None
0175     workflow = Workflow(pending_time=pending_time)
0176     workflow.add_work(work1)
0177     workflow.add_work(work2)
0178     workflow.add_work(work3)
0179     workflow.name = 'test_workflow.idds.%s.test' % time.time()
0180     return workflow
0181 
0182 
0183 if __name__ == '__main__':
0184     # idds dev host
0185     idds_host = "https://aipanda160.cern.ch:443/idds"
0186 
0187     # parse args
0188     request_id = os.environ.get("IDDS_BUILD_REQUEST_ID", None)
0189     signature = os.environ.get("IDDS_BUIL_SIGNATURE", None)
0190 
0191     if request_id is None:
0192         print("IDDS_BUILD_REQUEST_ID is not defined.")
0193         sys.exit(-1)
0194     if signature is None:
0195         print("IDDS_BUIL_SIGNATURE is not defined")
0196         sys.exit(-1)
0197 
0198     workflow = setup_workflow()
0199 
0200     wm = ClientManager(host=idds_host)
0201     wm.setup_json_outputs()
0202     ret = wm.update_build_request(request_id, signature, workflow)
0203     print(ret)
0204     if ret and 'status' in ret:
0205         sys.exit(ret['status'])
0206     else:
0207         sys.exit(-1)