Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2022 - 2023
0010 
0011 
0012 """
0013 Test client.
0014 """
0015 
0016 import os
0017 import tarfile
0018 import time
0019 import uuid
0020 
0021 # from pandaclient import Client
0022 
0023 # import traceback
0024 
0025 # from rucio.client.client import Client as Rucio_Client
0026 # from rucio.common.exception import CannotAuthenticate
0027 
0028 # from idds.client.client import Client
0029 from idds.client.clientmanager import ClientManager
0030 # from idds.common.constants import RequestType, RequestStatus
0031 from idds.common.utils import get_rest_host
0032 # from idds.tests.common import get_example_real_tape_stagein_request
0033 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0034 
0035 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0036 # from idds.workflowv2.workflow import Condition, Workflow
0037 from idds.workflowv2.workflow import Workflow
0038 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0039 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0040 
0041 # task_cloud = 'LSST'
0042 task_cloud = 'US'
0043 
0044 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0045 # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0046 # task_queue = 'SLAC_TEST'
0047 # task_queue = 'DOMA_LSST_SLAC_TEST'
0048 task_queue = 'SLAC_Rubin'
0049 # task_queue = 'CC-IN2P3_TEST'
0050 
0051 
0052 os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda'
0053 os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda'
0054 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
0055 
0056 
0057 def get_local_pfns():
0058     working_dir = os.path.dirname(os.path.realpath(__file__))
0059     local_pfns = []
0060     files = ['test_domapanda.py',
0061              'test_domapanda_build.py']
0062     for f in files:
0063         full_filename = os.path.join(working_dir, f)
0064         if os.path.exists(full_filename):
0065             print("Adding: %s" % full_filename)
0066             local_pfns.append(full_filename)
0067         else:
0068             print("Not exist: %s" % full_filename)
0069     # add iDDS client code, to be compiled on worker node.
0070     base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(working_dir))))
0071     for idds_dir in ['client', 'common', 'workflow', 'doma']:
0072         idds_full_dir = os.path.join(base_dir, idds_dir)
0073         local_pfns.append(idds_full_dir)
0074     # add main test script
0075     cmd_build = os.path.join(base_dir, 'main/tools/rubin/cmdline_builder.py')
0076     local_pfns.append(cmd_build)
0077     main_script = os.path.join(base_dir, 'main/tools/rubin/test_build.sh')
0078     local_pfns.append(main_script)
0079     return local_pfns
0080 
0081 
0082 def create_archive_file(output_filename, local_pfns):
0083     if not output_filename.startswith("/"):
0084         output_filename = os.path.join('/tmp/wguan', output_filename)
0085 
0086     with tarfile.open(output_filename, "w:gz", dereference=True) as tar:
0087         for local_pfn in local_pfns:
0088             base_name = os.path.basename(local_pfn)
0089             tar.add(local_pfn, arcname=os.path.basename(base_name))
0090     return output_filename
0091 
0092 
0093 def copy_files_to_pandacache(filename):
0094     from pandaclient import Client
0095     status, out = Client.putFile(filename, True)
0096     print("copy_files_to_pandacache: status: %s, out: %s" % (status, out))
0097     if out.startswith("NewFileName:"):
0098         # found the same input sandbox to reuse
0099         filename = out.split(":")[-1]
0100     elif out != "True":
0101         print(out)
0102         return None
0103 
0104     filename = os.path.basename(filename)
0105     cache_path = os.path.join(os.environ["PANDACACHE_URL"], 'cache')
0106     filename = os.path.join(cache_path, filename)
0107     return filename
0108 
0109 
0110 def setup_workflow():
0111     local_pfns = get_local_pfns()
0112     output_filename = "jobO.%s.tar.gz" % str(uuid.uuid4())
0113     output_filename = create_archive_file(output_filename, local_pfns)
0114     print("archive file: %s" % output_filename)
0115     output_filename = copy_files_to_pandacache(output_filename)
0116     print("pandacache file: %s" % output_filename)
0117 
0118     # executable = 'unset PYTHONPATH; source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/w_2022_53/loadLSST.bash; pwd; ls -al; setup lsst_distrib;\n python3 ${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder.py'
0119 
0120     executable = 'unset PYTHONPATH; source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/w_2022_53/loadLSST.bash; pwd; ls -al; setup lsst_distrib;'
0121 
0122     executable += 'if [[ ! -z "${PANDA_AUTH_DIR}" ]] && [[ ! -z "${PANDA_AUTH_ORIGIN}" ]]; then export PANDA_AUTH_ID_TOKEN=$(cat $PANDA_AUTH_DIR); export PANDA_AUTH_VO=$PANDA_AUTH_ORIGIN; export IDDS_OIDC_TOKEN=$(cat $PANDA_AUTH_DIR); export IDDS_VO=$PANDA_AUTH_ORIGIN; export PANDA_AUTH=oidc; else unset PANDA_AUTH; export IDDS_AUTH_TYPE=x509_proxy; fi;'   # noqa E501
0123 
0124     # executable = 'python3 ${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder_build.py'
0125     # executable += 'wget https://storage.googleapis.com/drp-us-central1-containers/cmdline_builder.py;'
0126     executable += 'wget https://wguan-wisc.web.cern.ch/wguan-wisc/cmdline_builder.py;'
0127     executable += 'export IDDS_HOST=https://aipanda160.cern.ch:443/idds;'
0128     executable += 'python3 cmdline_builder.py '
0129 
0130     executable += output_filename + ' ' + './test_build.sh'
0131 
0132     work1 = DomaPanDAWork(executable=executable,
0133                           task_type='lsst_build',
0134                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0135                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0136                           log_collections=[], dependency_map=None,
0137                           task_name="build_task", task_queue=task_queue,
0138                           encode_command_line=True,
0139                           prodSourceLabel='managed',
0140                           task_log={"dataset": "PandaJob_#{pandaid}/",
0141                                     "destination": "local",
0142                                     "param_type": "log",
0143                                     "token": "local",
0144                                     "type": "template",
0145                                     "value": "log.tgz"},
0146                           task_cloud=task_cloud)
0147 
0148     pending_time = 12
0149     # pending_time = None
0150     workflow = Workflow(pending_time=pending_time)
0151 
0152     workflow.add_work(work1)
0153     workflow.name = 'test_workflow.idds.%s.test' % time.time()
0154     return workflow
0155 
0156 
0157 if __name__ == '__main__':
0158     host = get_rest_host()
0159     workflow = setup_workflow()
0160 
0161     wm = ClientManager(host=host)
0162     # wm.set_original_user(user_name="wguandev")
0163     request_id = wm.submit_build(workflow, use_dataset_name=False)
0164     print(request_id)