File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test client.
0014 """
0015
0016 import os
0017 import tarfile
0018 import time
0019 import uuid
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029 from idds.client.clientmanager import ClientManager
0030
0031 from idds.common.utils import get_rest_host
0032
0033
0034
0035
0036
0037 from idds.workflowv2.workflow import Workflow
0038
0039 from idds.doma.workflowv2.domapandawork import DomaPanDAWork
0040
0041
0042 task_cloud = 'US'
0043
0044 task_queue = 'DOMA_LSST_GOOGLE_TEST'
0045
0046
0047
0048 task_queue = 'SLAC_Rubin'
0049
0050
0051
0052 os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda'
0053 os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda'
0054 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
0055
0056
0057 def get_local_pfns():
0058 working_dir = os.path.dirname(os.path.realpath(__file__))
0059 local_pfns = []
0060 files = ['test_domapanda.py',
0061 'test_domapanda_build.py']
0062 for f in files:
0063 full_filename = os.path.join(working_dir, f)
0064 if os.path.exists(full_filename):
0065 print("Adding: %s" % full_filename)
0066 local_pfns.append(full_filename)
0067 else:
0068 print("Not exist: %s" % full_filename)
0069
0070 base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(working_dir))))
0071 for idds_dir in ['client', 'common', 'workflow', 'doma']:
0072 idds_full_dir = os.path.join(base_dir, idds_dir)
0073 local_pfns.append(idds_full_dir)
0074
0075 cmd_build = os.path.join(base_dir, 'main/tools/rubin/cmdline_builder.py')
0076 local_pfns.append(cmd_build)
0077 main_script = os.path.join(base_dir, 'main/tools/rubin/test_build.sh')
0078 local_pfns.append(main_script)
0079 return local_pfns
0080
0081
0082 def create_archive_file(output_filename, local_pfns):
0083 if not output_filename.startswith("/"):
0084 output_filename = os.path.join('/tmp/wguan', output_filename)
0085
0086 with tarfile.open(output_filename, "w:gz", dereference=True) as tar:
0087 for local_pfn in local_pfns:
0088 base_name = os.path.basename(local_pfn)
0089 tar.add(local_pfn, arcname=os.path.basename(base_name))
0090 return output_filename
0091
0092
0093 def copy_files_to_pandacache(filename):
0094 from pandaclient import Client
0095 status, out = Client.putFile(filename, True)
0096 print("copy_files_to_pandacache: status: %s, out: %s" % (status, out))
0097 if out.startswith("NewFileName:"):
0098
0099 filename = out.split(":")[-1]
0100 elif out != "True":
0101 print(out)
0102 return None
0103
0104 filename = os.path.basename(filename)
0105 cache_path = os.path.join(os.environ["PANDACACHE_URL"], 'cache')
0106 filename = os.path.join(cache_path, filename)
0107 return filename
0108
0109
0110 def setup_workflow():
0111 local_pfns = get_local_pfns()
0112 output_filename = "jobO.%s.tar.gz" % str(uuid.uuid4())
0113 output_filename = create_archive_file(output_filename, local_pfns)
0114 print("archive file: %s" % output_filename)
0115 output_filename = copy_files_to_pandacache(output_filename)
0116 print("pandacache file: %s" % output_filename)
0117
0118
0119
0120 executable = 'unset PYTHONPATH; source /cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib/w_2022_53/loadLSST.bash; pwd; ls -al; setup lsst_distrib;'
0121
0122 executable += 'if [[ ! -z "${PANDA_AUTH_DIR}" ]] && [[ ! -z "${PANDA_AUTH_ORIGIN}" ]]; then export PANDA_AUTH_ID_TOKEN=$(cat $PANDA_AUTH_DIR); export PANDA_AUTH_VO=$PANDA_AUTH_ORIGIN; export IDDS_OIDC_TOKEN=$(cat $PANDA_AUTH_DIR); export IDDS_VO=$PANDA_AUTH_ORIGIN; export PANDA_AUTH=oidc; else unset PANDA_AUTH; export IDDS_AUTH_TYPE=x509_proxy; fi;'
0123
0124
0125
0126 executable += 'wget https://wguan-wisc.web.cern.ch/wguan-wisc/cmdline_builder.py;'
0127 executable += 'export IDDS_HOST=https://aipanda160.cern.ch:443/idds;'
0128 executable += 'python3 cmdline_builder.py '
0129
0130 executable += output_filename + ' ' + './test_build.sh'
0131
0132 work1 = DomaPanDAWork(executable=executable,
0133 task_type='lsst_build',
0134 primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
0135 output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
0136 log_collections=[], dependency_map=None,
0137 task_name="build_task", task_queue=task_queue,
0138 encode_command_line=True,
0139 prodSourceLabel='managed',
0140 task_log={"dataset": "PandaJob_#{pandaid}/",
0141 "destination": "local",
0142 "param_type": "log",
0143 "token": "local",
0144 "type": "template",
0145 "value": "log.tgz"},
0146 task_cloud=task_cloud)
0147
0148 pending_time = 12
0149
0150 workflow = Workflow(pending_time=pending_time)
0151
0152 workflow.add_work(work1)
0153 workflow.name = 'test_workflow.idds.%s.test' % time.time()
0154 return workflow
0155
0156
0157 if __name__ == '__main__':
0158 host = get_rest_host()
0159 workflow = setup_workflow()
0160
0161 wm = ClientManager(host=host)
0162
0163 request_id = wm.submit_build(workflow, use_dataset_name=False)
0164 print(request_id)