Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2022 - 2023
0010 
0011 
0012 """
0013 Test client.
0014 """
0015 
0016 import os
0017 import tarfile
0018 import time
0019 import uuid
0020 
0021 # from pandaclient import Client
0022 
0023 # import traceback
0024 
0025 # from rucio.client.client import Client as Rucio_Client
0026 # from rucio.common.exception import CannotAuthenticate
0027 
0028 # from idds.client.client import Client
0029 # from idds.client.clientmanager import ClientManager
0030 # from idds.common.constants import RequestType, RequestStatus
0031 # from idds.common.utils import get_rest_host
0032 # from idds.tests.common import get_example_real_tape_stagein_request
0033 # from idds.tests.common import get_example_prodsys2_tape_stagein_request
0034 
0035 # from idds.workflowv2.work import Work, Parameter, WorkStatus
0036 # from idds.workflowv2.workflow import Condition, Workflow
0037 from idds.workflowv2.workflow import Workflow
0038 # from idds.atlas.workflowv2.atlasstageinwork import ATLASStageinWork
0039 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0040 
0041 import idds.common.utils as idds_utils
0042 import pandaclient.idds_api
0043 
0044 
0045 # task_cloud = 'LSST'
0046 task_cloud = 'US'
0047 
0048 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0049 # task_queue = 'DOMA_LSST_GOOGLE_MERGE'
0050 # task_queue = 'SLAC_TEST'
0051 # task_queue = 'DOMA_LSST_SLAC_TEST'
0052 task_queue = 'SLAC_Rubin'
0053 # task_queue = 'CC-IN2P3_TEST'
0054 
0055 
0056 os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda'
0057 os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda'
0058 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
0059 
0060 
0061 def get_local_pfns():
0062     working_dir = os.path.dirname(os.path.realpath(__file__))
0063     local_pfns = []
0064     files = ['test_domapanda.py',
0065              'test_domapanda_build.py',
0066              'test_domapanda_build_pandaclient.py']
0067     for f in files:
0068         full_filename = os.path.join(working_dir, f)
0069         if os.path.exists(full_filename):
0070             print("Adding: %s" % full_filename)
0071             local_pfns.append(full_filename)
0072         else:
0073             print("Not exist: %s" % full_filename)
0074     # add iDDS client code, to be compiled on worker node.
0075     base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(working_dir))))
0076     for idds_dir in ['client', 'common', 'workflow', 'doma']:
0077         idds_full_dir = os.path.join(base_dir, idds_dir)
0078         local_pfns.append(idds_full_dir)
0079     # add main test script
0080     cmd_build = os.path.join(base_dir, 'main/tools/rubin/cmdline_builder.py')
0081     local_pfns.append(cmd_build)
0082     main_script = os.path.join(base_dir, 'main/tools/rubin/test_build.sh')
0083     local_pfns.append(main_script)
0084     return local_pfns
0085 
0086 
0087 def create_archive_file(output_filename, local_pfns):
0088     if not output_filename.startswith("/"):
0089         output_filename = os.path.join('/tmp/wguan', output_filename)
0090 
0091     with tarfile.open(output_filename, "w:gz", dereference=True) as tar:
0092         for local_pfn in local_pfns:
0093             base_name = os.path.basename(local_pfn)
0094             tar.add(local_pfn, arcname=os.path.basename(base_name))
0095     return output_filename
0096 
0097 
0098 def copy_files_to_pandacache(filename):
0099     from pandaclient import Client
0100     status, out = Client.putFile(filename, True)
0101     print("copy_files_to_pandacache: status: %s, out: %s" % (status, out))
0102     if out.startswith("NewFileName:"):
0103         # found the same input sandbox to reuse
0104         filename = out.split(":")[-1]
0105     elif out != "True":
0106         print(out)
0107         return None
0108 
0109     filename = os.path.basename(filename)
0110     cache_path = os.path.join(os.environ["PANDACACHE_URL"], 'cache')
0111     filename = os.path.join(cache_path, filename)
0112     return filename
0113 
0114 
0115 def setup_workflow():
0116     local_pfns = get_local_pfns()
0117     output_filename = "jobO.%s.tar.gz" % str(uuid.uuid4())
0118     output_filename = create_archive_file(output_filename, local_pfns)
0119     print("archive file: %s" % output_filename)
0120     output_filename = copy_files_to_pandacache(output_filename)
0121     print("pandacache file: %s" % output_filename)
0122 
0123     # executable = 'unset PYTHONPATH; source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/w_2022_53/loadLSST.bash; pwd; ls -al; setup lsst_distrib;\n python3 ${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder.py'
0124 
0125     executable = 'unset PYTHONPATH; source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/w_2023_41/loadLSST.bash; pwd; ls -al; setup lsst_distrib;'
0126 
0127     # executable += 'if [[ ! -z "${PANDA_AUTH_DIR}" ]] && [[ ! -z "${PANDA_AUTH_ORIGIN}" ]]; then export PANDA_AUTH_ID_TOKEN=$(cat $PANDA_AUTH_DIR); export PANDA_AUTH_VO=$PANDA_AUTH_ORIGIN; export IDDS_OIDC_TOKEN=$(cat $PANDA_AUTH_DIR); export IDDS_VO=$PANDA_AUTH_ORIGIN; export PANDA_AUTH=oidc; else unset PANDA_AUTH; export IDDS_AUTH_TYPE=x509_proxy; fi;'   # noqa E501
0128 
0129     executable += """if [[ ! -z "${PANDA_AUTH_DIR}" ]] && [[ ! -z "${PANDA_AUTH_ORIGIN}" ]];
0130         then export PANDA_AUTH_ID_TOKEN=$(cat $PANDA_AUTH_DIR);
0131         export PANDA_AUTH_VO=$PANDA_AUTH_ORIGIN;
0132         export IDDS_OIDC_TOKEN=$(cat $PANDA_AUTH_DIR);
0133         export IDDS_VO=$PANDA_AUTH_ORIGIN;
0134         export PANDA_AUTH=oidc;
0135         else unset PANDA_AUTH;
0136         export IDDS_AUTH_TYPE=x509_proxy; fi;
0137         export PANDA_CONFIG_ROOT=$(pwd);
0138         export PANDA_VERIFY_HOST=off;
0139         export PANDA_SYS=$CONDA_PREFIX;
0140         export PANDA_URL_SSL=${PANDA_SERVER_URL}/server/panda;
0141         export PANDACACHE_URL=$PANDA_URL_SSL;
0142         export PANDA_URL=$PANDA_URL_SSL;
0143         export PANDA_BEHIND_REAL_LB=true;
0144         """
0145     # executable = 'python3 ${CTRL_BPS_PANDA_DIR}/python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder_build.py'
0146     # executable += 'wget https://storage.googleapis.com/drp-us-central1-containers/cmdline_builder.py;'
0147     executable += 'wget https://wguan-wisc.web.cern.ch/wguan-wisc/cmdline_builder.py;'
0148     executable += 'export IDDS_HOST=https://aipanda015.cern.ch:443/idds;'
0149     executable += 'python3 cmdline_builder.py '
0150 
0151     executable += output_filename + ' ' + './test_build.sh'
0152 
0153     work1 = DomaPanDAWork(executable=executable,
0154                           task_type='lsst_build',
0155                           primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0156                           output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0157                           log_collections=[], dependency_map=None,
0158                           task_name="build_task", task_queue=task_queue,
0159                           encode_command_line=True,
0160                           prodSourceLabel='managed',
0161                           task_log={"dataset": "PandaJob_#{pandaid}/",
0162                                     "destination": "local",
0163                                     "param_type": "log",
0164                                     "token": "local",
0165                                     "type": "template",
0166                                     "value": "log.tgz"},
0167                           task_cloud=task_cloud)
0168 
0169     pending_time = 12
0170     # pending_time = None
0171     workflow = Workflow(pending_time=pending_time)
0172 
0173     workflow.add_work(work1)
0174     workflow.name = 'test_workflow.idds.%s.test' % time.time()
0175     return workflow
0176 
0177 
0178 def submit(workflow, idds_server=None):
0179 
0180     c = pandaclient.idds_api.get_api(idds_utils.json_dumps,
0181                                      idds_host=idds_server, compress=True, manager=True)
0182     request_id = c.submit_build(workflow, use_dataset_name=False)
0183     print("Submitted into iDDs with request id=%s", str(request_id))
0184 
0185 
0186 if __name__ == '__main__':
0187     workflow = setup_workflow()
0188 
0189     submit(workflow)