File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test workflow.
0014 """
0015
0016 import inspect
0017 import logging
0018 import os
0019 import shutil
0020 import sys
0021
0022
0023 from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file
0024
0025 from idds.iworkflow.workflow import Workflow
0026 from idds.iworkflow.work import work
0027
0028
0029 setup_logging(__name__)
0030
0031
0032 @work
0033 def test_func(name, name1=None):
0034 print('test_func starts')
0035 print(name)
0036 print(name1)
0037 print('test_func ends')
0038 return 'test result: %s, %s' % (name, name1)
0039
0040
0041 @work(map_results=True)
0042 def test_func1(name, name1=None):
0043 print('test_func1 starts')
0044 print(name)
0045 print(name1)
0046 print('test_func1 ends')
0047 return 'test result1: %s, %s' % (name, name1)
0048
0049
0050 def test_workflow():
0051 print("test workflow starts")
0052 multi_jobs_kwargs_list = [{'name': 1, 'name1': 2},
0053 {'name': 3, 'name1': 4}]
0054 ret = test_func(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list)
0055 print(ret)
0056
0057 ret = test_func1(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list)
0058 print(ret)
0059 print("test workflow ends")
0060
0061
0062 @work
0063 def get_params():
0064 list_params = [i for i in range(10)]
0065 return list_params
0066
0067
0068 def test_workflow_mulitple_work():
0069 print("test workflow multiple work starts")
0070 list_params = get_params()
0071
0072 ret = test_func(list_params)
0073 print(ret)
0074 print("test workflow multiple work ends")
0075
0076
0077 def submit_workflow(wf):
0078 req_id = wf.submit()
0079 print("req id: %s" % req_id)
0080
0081
0082 def run_workflow_wrapper(wf):
0083 cmd = wf.get_runner()
0084 logging.info(f'To run workflow: {cmd}')
0085
0086 exit_code = run_process(cmd, wait=True)
0087 logging.info(f'Run workflow finished with exit code: {exit_code}')
0088 return exit_code
0089
0090
0091 def run_workflow_remote_wrapper(wf):
0092 cmd = wf.get_runner()
0093 logging.info('To run workflow: %s' % cmd)
0094
0095 work_dir = '/tmp/idds'
0096 shutil.rmtree(work_dir)
0097 os.makedirs(work_dir)
0098 os.chdir(work_dir)
0099 logging.info("current dir: %s" % os.getcwd())
0100
0101
0102
0103
0104 setup = wf.setup_source_files()
0105 logging.info("setup: %s" % setup)
0106
0107 exc_cmd = 'cd %s' % work_dir
0108 exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper"
0109 exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd
0110 logging.info("exc_cmd: %s" % exc_cmd)
0111 exit_code = run_process(exc_cmd, wait=True)
0112 logging.info(f'Run workflow finished with exit code: {exit_code}')
0113 return exit_code
0114
0115
0116 def test_create_archive_file(wf):
0117 archive_name = wf._context.get_archive_name()
0118 source_dir = wf._context._source_dir
0119 logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir))
0120 archive_file = create_archive_file('/tmp', archive_name, [source_dir])
0121 logging.info("created archive file: %s" % archive_file)
0122
0123
0124 if __name__ == '__main__':
0125 logging.info("start")
0126 os.chdir(os.path.dirname(os.path.realpath(__file__)))
0127
0128 wf = Workflow(func=test_workflow, service='idds')
0129
0130
0131 wf.queue = 'FUNCX_TEST'
0132 wf.cloud = 'US'
0133
0134 wf_json = json_dumps(wf)
0135
0136 wf_1 = json_loads(wf_json)
0137
0138
0139
0140
0141
0142 logging.info("prepare workflow")
0143 wf.prepare()
0144 logging.info("prepared workflow")
0145
0146 wf.submit()
0147
0148
0149
0150
0151
0152