Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Sergey Padolski, <spadolski@bnl.gov>, 2021
0010 # - Wen Guan, <wen.guan@cern.ch>, 2023
0011 
0012 
0013 """
0014 Test client.
0015 """
0016 
0017 import json   # noqa F401
0018 import sys
0019 import string
0020 import random
0021 import time
0022 
0023 # import traceback
0024 
0025 # from rucio.client.client import Client as Rucio_Client
0026 # from rucio.common.exception import CannotAuthenticate
0027 
0028 # from idds.client.client import Client
0029 from idds.client.clientmanager import ClientManager
0030 # from idds.common.constants import RequestType, RequestStatus
0031 from idds.common.utils import get_rest_host
0032 # from idds.tests.common import get_example_real_tape_stagein_request
0033 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0034 
0035 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0036 # from idds.workflowv2.workflow import Condition, Workflow
0037 from idds.workflowv2.workflow import Workflow
0038 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0039 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0040 
0041 
0042 if len(sys.argv) > 1 and sys.argv[1] == "in2p3":
0043     site = 'in2p3'
0044     task_cloud = 'EU'
0045     # task_queue = 'CC-IN2P3_TEST'
0046     task_queue = 'CC-IN2P3_Rubin'
0047     task_queue1 = 'CC-IN2P3_Rubin_Medium'
0048     task_queue2 = 'CC-IN2P3_Rubin_Himem'
0049     task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
0050     task_queue4 = 'CC-IN2P3_Rubin_Merge'
0051 elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
0052     site = 'lancs'
0053     task_cloud = 'EU'
0054     # task_queue = 'LANCS_TEST'
0055     task_queue = 'LANCS_Rubin'
0056     task_queue1 = 'LANCS_Rubin_Medium'
0057     task_queue2 = 'LANCS_Rubin_Himem'
0058     task_queue3 = 'LANCS_Rubin_Extra_Himem'
0059     task_queue3 = 'LANCS_Rubin_Himem'
0060     task_queue4 = 'LANCS_Rubin_Merge'
0061 else:
0062     site = 'slac'
0063     # task_cloud = 'LSST'
0064     task_cloud = 'US'
0065 
0066     task_queue = 'DOMA_LSST_GOOGLE_TEST'
0067     # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0068     # task_queue = 'SLAC_TEST'
0069     # task_queue = 'DOMA_LSST_SLAC_TEST'
0070     task_queue = 'SLAC_Rubin'
0071     task_queue1 = 'SLAC_Rubin_Medium'
0072     task_queue2 = 'SLAC_Rubin_Himem'
0073     task_queue3 = 'SLAC_Rubin_Extra_Himem'
0074     task_queue4 = 'SLAC_Rubin_Merge'
0075     # task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
0076     # task_queue = 'SLAC_Rubin_Merge'
0077     task_queue = 'SLAC_TEST'
0078     task_queue4 = task_queue3 = task_queue2 = task_queue1 = task_queue
0079 
0080 # task_cloud = None
0081 
0082 
0083 def randStr(chars=string.ascii_lowercase + string.digits, N=10):
0084     return ''.join(random.choice(chars) for _ in range(N))
0085 
0086 
0087 class PanDATask(object):
0088     name = None
0089     step = None
0090     dependencies = []
0091 
0092 
0093 def setup_workflow():
0094 
0095     es_map = {}
0096     taskN1 = PanDATask()
0097     taskN1.step = "step1"
0098     taskN1.name = site + "_" + taskN1.step + "_" + randStr()
0099     taskN1.dependencies = [
0100         {"name": "00000" + str(k),
0101          "order_id": k,
0102          "dependencies": [],
0103          "submitted": False} for k in range(6)
0104     ]
0105 
0106     es_map[taskN1.step] = {str(item["order_id"]): item["name"] for item in taskN1.dependencies}
0107 
0108     taskN2 = PanDATask()
0109     taskN2.step = "step2"
0110     taskN2.name = site + "_" + taskN2.step + "_" + randStr()
0111     taskN2.dependencies = [
0112         {
0113             "name": "000010",
0114             "order_id": 0,
0115             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0116                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0117             "submitted": False
0118         },
0119         {
0120             "name": "000011",
0121             "order_id": 1,
0122             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0123                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0124             "submitted": False
0125         },
0126         {
0127             "name": "000012",
0128             "order_id": 2,
0129             "dependencies": [{"task": taskN1.name, "inputname": "000001", "available": False},
0130                              {"task": taskN1.name, "inputname": "000002", "available": False}],
0131             "submitted": False
0132         }
0133     ]
0134 
0135     es_map[taskN2.step] = {str(item["order_id"]): item["name"] for item in taskN2.dependencies}
0136 
0137     taskN3 = PanDATask()
0138     taskN3.step = "step3"
0139     taskN3.name = site + "_" + taskN3.step + "_" + randStr()
0140     taskN3.dependencies = [
0141         {
0142             "name": "000020",
0143             "order_id": 0,
0144             "dependencies": [],
0145             "submitted": False
0146         },
0147         {
0148             "name": "000021",
0149             "order_id": 1,
0150             "dependencies": [{"task": taskN2.name, "inputname": "000010", "available": False},
0151                              {"task": taskN2.name, "inputname": "000011", "available": False}],
0152             "submitted": False
0153         },
0154         {
0155             "name": "000022",
0156             "order_id": 2,
0157             "dependencies": [{"task": taskN2.name, "inputname": "000011", "available": False},
0158                              {"task": taskN2.name, "inputname": "000012", "available": False}],
0159             "submitted": False
0160         },
0161         {
0162             "name": "000023",
0163             "order_id": 3,
0164             "dependencies": [],
0165             "submitted": False
0166         },
0167         {
0168             "name": "000024",
0169             "order_id": 4,
0170             "groups": taskN3.name,
0171             "dependencies": [{"task": taskN3.name, "inputname": "000021", "available": False},
0172                              {"task": taskN3.name, "inputname": "000023", "available": False}],
0173             "submitted": False
0174         },
0175     ]
0176 
0177     es_map[taskN3.step] = {str(item["order_id"]): item["name"] for item in taskN3.dependencies}
0178 
0179     taskN4 = PanDATask()
0180     taskN4.step = "step4"
0181     taskN4.name = site + "_" + taskN4.step + "_" + randStr()
0182     taskN4.dependencies = [
0183         {"name": "00004" + str(k),
0184          "order_id": k,
0185          "dependencies": [],
0186          "submitted": False} for k in range(6)
0187     ]
0188 
0189     es_map[taskN4.step] = {str(item["order_id"]): item["name"] for item in taskN4.dependencies}
0190 
0191     taskN5 = PanDATask()
0192     taskN5.step = "step5"
0193     taskN5.name = site + "_" + taskN5.step + "_" + randStr()
0194     taskN5.dependencies = [
0195         {"name": "00005" + str(k),
0196          "order_id": k,
0197          "dependencies": [],
0198          "submitted": False} for k in range(6)
0199     ]
0200 
0201     es_map[taskN5.step] = {str(item["order_id"]): item["name"] for item in taskN5.dependencies}
0202 
0203     # print(json.dumps(es_map))
0204     # raise
0205     # executable = "wget https://wguan-wisc.web.cern.ch/wguan-wisc/doma_es_executor.py; chmod +x doma_es_executor.py; ./doma_es_executor.py echo ${IN/L}"
0206     # executable = "export RUBIN_ES_CORES=4; echo; RUBIN_ES_MAP=%s; echo ${IN/L}" % json.dumps(es_map)
0207 
0208     es_map_file = "/sdf/data/rubin/panda_jobs/panda_env_pilot/test_rubin_es_map.json"
0209     executable = "export RUBIN_ES_CORES=4; echo; RUBIN_ES_MAP_FILE=%s; echo ${IN/L}" % es_map_file
0210 
0211     work1 = DomaPanDAWork(executable=executable,
0212                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0213                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0214                           log_collections=[], dependency_map=taskN1.dependencies,
0215                           task_name=taskN1.name, task_queue=task_queue,
0216                           encode_command_line=True,
0217                           task_priority=981,
0218                           es=True,
0219                           es_label=taskN1.step,
0220                           max_events_per_job=100,
0221                           prodSourceLabel='managed',
0222                           task_log={"dataset": "PandaJob_#{pandaid}/",
0223                                     "destination": "local",
0224                                     "param_type": "log",
0225                                     "token": "local",
0226                                     "type": "template",
0227                                     "value": "log.tgz"},
0228                           task_cloud=task_cloud)
0229     work2 = DomaPanDAWork(executable=executable,
0230                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
0231                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
0232                           log_collections=[], dependency_map=taskN2.dependencies,
0233                           task_name=taskN2.name, task_queue=task_queue1,
0234                           encode_command_line=True,
0235                           task_priority=881,
0236                           es=True,
0237                           es_label=taskN2.step,
0238                           prodSourceLabel='managed',
0239                           task_log={"dataset": "PandaJob_#{pandaid}/",
0240                                     "destination": "local",
0241                                     "param_type": "log",
0242                                     "token": "local",
0243                                     "type": "template",
0244                                     "value": "log.tgz"},
0245                           task_cloud=task_cloud)
0246     work3 = DomaPanDAWork(executable=executable,
0247                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
0248                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
0249                           log_collections=[], dependency_map=taskN3.dependencies,
0250                           task_name=taskN3.name, task_queue=task_queue2,
0251                           encode_command_line=True,
0252                           task_priority=781,
0253                           es=True,
0254                           es_label=taskN3.step,
0255                           prodSourceLabel='managed',
0256                           task_log={"dataset": "PandaJob_#{pandaid}/",
0257                                     "destination": "local",
0258                                     "param_type": "log",
0259                                     "token": "local",
0260                                     "type": "template",
0261                                     "value": "log.tgz"},
0262                           task_cloud=task_cloud)
0263 
0264     work4 = DomaPanDAWork(executable=executable,
0265                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0266                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0267                           log_collections=[], dependency_map=taskN4.dependencies,
0268                           task_name=taskN4.name, task_queue=task_queue3,
0269                           encode_command_line=True,
0270                           task_priority=981,
0271                           es=True,
0272                           es_label=taskN4.step,
0273                           prodSourceLabel='managed',
0274                           task_log={"dataset": "PandaJob_#{pandaid}/",
0275                                     "destination": "local",
0276                                     "param_type": "log",
0277                                     "token": "local",
0278                                     "type": "template",
0279                                     "value": "log.tgz"},
0280                           task_cloud=task_cloud)
0281 
0282     work5 = DomaPanDAWork(executable=executable,
0283                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0284                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0285                           log_collections=[], dependency_map=taskN5.dependencies,
0286                           task_name=taskN5.name, task_queue=task_queue4,
0287                           encode_command_line=True,
0288                           task_priority=981,
0289                           es=True,
0290                           es_label=taskN5.step,
0291                           prodSourceLabel='managed',
0292                           task_log={"dataset": "PandaJob_#{pandaid}/",
0293                                     "destination": "local",
0294                                     "param_type": "log",
0295                                     "token": "local",
0296                                     "type": "template",
0297                                     "value": "log.tgz"},
0298                           task_cloud=task_cloud)
0299 
0300     pending_time = 12
0301     # pending_time = None
0302     workflow = Workflow(pending_time=pending_time)
0303     workflow.add_work(work1)
0304     workflow.add_work(work2)
0305     workflow.add_work(work3)
0306     workflow.add_work(work4)
0307     workflow.add_work(work5)
0308     workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
0309     return workflow
0310 
0311 
0312 if __name__ == '__main__':
0313     host = get_rest_host()
0314     workflow = setup_workflow()
0315 
0316     wm = ClientManager(host=host)
0317     # wm.set_original_user(user_name="wguandev")
0318     request_id = wm.submit(workflow, use_dataset_name=False)
0319     print(request_id)