Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2022 - 2023
0010 
0011 
0012 """
0013 Test client.
0014 """
0015 
0016 import os
0017 import sys
0018 import string
0019 import random
0020 import time
0021 
0022 # import traceback
0023 
0024 # from rucio.client.client import Client as Rucio_Client
0025 # from rucio.common.exception import CannotAuthenticate
0026 
0027 # from idds.client.client import Client
0028 # from idds.client.clientmanager import ClientManager
0029 # from idds.common.constants import RequestType, RequestStatus
0030 # from idds.common.utils import get_rest_host
0031 # from idds.tests.common import get_example_real_tape_stagein_request
0032 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0033 
0034 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0035 # from idds.workflowv2.workflow import Condition, Workflow
0036 from idds.workflowv2.workflow import Workflow
0037 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0038 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0039 
0040 import idds.common.utils as idds_utils
0041 import pandaclient.idds_api
0042 
0043 # task_cloud = 'LSST'
0044 task_cloud = 'US'
0045 
0046 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0047 # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0048 # task_queue = 'SLAC_TEST'
0049 # task_queue = 'DOMA_LSST_SLAC_TEST'
0050 task_queue = 'SLAC_Rubin'
0051 # task_queue = 'CC-IN2P3_TEST'
0052 
0053 
0054 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0055     return ''.join(random.choice(chars) for _ in range(N))
0056 
0057 
0058 class PanDATask(object):
0059     name = None
0060     step = None
0061     dependencies = []
0062 
0063 
0064 def setup_workflow():
0065 
0066     taskN1 = PanDATask()
0067     taskN1.step = "step1"
0068     taskN1.name = taskN1.step + "_" + randStr()
0069     taskN1.dependencies = [
0070         {"name": "00000" + str(k),
0071          "dependencies": [],
0072          "submitted": False} for k in range(6)
0073     ]
0074 
0075     taskN2 = PanDATask()
0076     taskN2.step = "step2"
0077     taskN2.name = taskN2.step + "_" + randStr()
0078     taskN2.dependencies = [
0079         {
0080             "name": "000010",
0081             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0082                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0083             "submitted": False
0084         },
0085         {
0086             "name": "000011",
0087             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0088                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0089             "submitted": False
0090         },
0091         {
0092             "name": "000012",
0093             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0094                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0095             "submitted": False
0096         }
0097     ]
0098 
0099     taskN3 = PanDATask()
0100     taskN3.step = "step3"
0101     taskN3.name = taskN3.step + "_" + randStr()
0102     taskN3.dependencies = [
0103         {
0104             "name": "000020",
0105             "dependencies": [],
0106             "submitted": False
0107         },
0108         {
0109             "name": "000021",
0110             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0111                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0112             "submitted": False
0113         },
0114         {
0115             "name": "000022",
0116             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0117                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0118             "submitted": False
0119         },
0120         {
0121             "name": "000023",
0122             "dependencies": [],
0123             "submitted": False
0124         },
0125         {
0126             "name": "000024",
0127             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0128                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0129             "submitted": False
0130         },
0131     ]
0132 
0133     work1 = DomaPanDAWork(executable='echo',
0134                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0135                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0136                           log_collections=[], dependency_map=taskN1.dependencies,
0137                           task_name=taskN1.name, task_queue=task_queue,
0138                           encode_command_line=True,
0139                           prodSourceLabel='managed',
0140                           task_log={"dataset": "PandaJob_#{pandaid}/",
0141                                     "destination": "local",
0142                                     "param_type": "log",
0143                                     "token": "local",
0144                                     "type": "template",
0145                                     "value": "log.tgz"},
0146                           task_cloud=task_cloud)
0147     work2 = DomaPanDAWork(executable='echo',
0148                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0149                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0150                           log_collections=[], dependency_map=taskN2.dependencies,
0151                           task_name=taskN2.name, task_queue=task_queue,
0152                           encode_command_line=True,
0153                           prodSourceLabel='managed',
0154                           task_log={"dataset": "PandaJob_#{pandaid}/",
0155                                     "destination": "local",
0156                                     "param_type": "log",
0157                                     "token": "local",
0158                                     "type": "template",
0159                                     "value": "log.tgz"},
0160                           task_cloud=task_cloud)
0161     work3 = DomaPanDAWork(executable='echo',
0162                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0163                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0164                           log_collections=[], dependency_map=taskN3.dependencies,
0165                           task_name=taskN3.name, task_queue=task_queue,
0166                           encode_command_line=True,
0167                           prodSourceLabel='managed',
0168                           task_log={"dataset": "PandaJob_#{pandaid}/",
0169                                     "destination": "local",
0170                                     "param_type": "log",
0171                                     "token": "local",
0172                                     "type": "template",
0173                                     "value": "log.tgz"},
0174                           task_cloud=task_cloud)
0175 
0176     pending_time = 12
0177     # pending_time = None
0178     workflow = Workflow(pending_time=pending_time)
0179     workflow.add_work(work1)
0180     workflow.add_work(work2)
0181     workflow.add_work(work3)
0182     workflow.name = 'test_workflow.idds.%s.test' % time.time()
0183     return workflow
0184 
0185 
0186 def submit(workflow, idds_server, request_id, signature):
0187 
0188     # wm = ClientManager(host=idds_host)
0189     # wm.setup_json_outputs()
0190     # ret = wm.update_build_request(request_id, signature, workflow)
0191 
0192     c = pandaclient.idds_api.get_api(idds_utils.json_dumps,
0193                                      idds_host=idds_server, compress=True, manager=True)
0194     ret = c.update_build_request(request_id, signature, workflow)
0195     print(ret)
0196     return ret
0197 
0198 
0199 if __name__ == '__main__':
0200     # idds dev host
0201     # idds_host = "https://aipanda160.cern.ch:443/idds"
0202     idds_host = None
0203 
0204     # parse args
0205     request_id = os.environ.get("IDDS_BUILD_REQUEST_ID", None)
0206     signature = os.environ.get("IDDS_BUIL_SIGNATURE", None)
0207 
0208     if request_id is None:
0209         print("IDDS_BUILD_REQUEST_ID is not defined.")
0210         sys.exit(-1)
0211     if signature is None:
0212         print("IDDS_BUIL_SIGNATURE is not defined")
0213         sys.exit(-1)
0214 
0215     workflow = setup_workflow()
0216 
0217     ret = submit(workflow, idds_host, request_id, signature)
0218     print(ret)
0219     # if ret and 'status' in ret:
0220     #     sys.exit(ret['status'])
0221     # else:
0222     #     sys.exit(-1)
0223     # ret = (0, [True, {'request_id': '5756'}])
0224     if ret[0] == 0 and ret[1][0] is True:
0225         sys.exit(0)
0226     sys.exit(-1)