File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test workflow.
0014 """
0015
0016 import inspect
0017 import logging
0018 import os
0019 import shutil
0020 import sys
0021
0022
0023 from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file
0024
0025 from idds.iworkflow.workflow import Workflow
0026 from idds.iworkflow.work import work
0027
0028
0029 setup_logging(__name__)
0030
0031
0032 @work
0033 def test_func(name):
0034 print('test_func starts')
0035 print(name)
0036 print('test_func ends')
0037 return 'test result: %s' % name
0038
0039
0040 def test_func1(name):
0041 print('test_func1 starts')
0042 print(name)
0043 print('test_func1 ends')
0044 return 'test result: %s' % name
0045
0046
0047 def test_workflow():
0048 print("test workflow starts")
0049 ret = test_func(name='idds')
0050 print(ret)
0051 print("test workflow ends")
0052
0053
0054 @work
0055 def get_params():
0056 list_params = [i for i in range(10)]
0057 return list_params
0058
0059
0060 def test_workflow_mulitple_work():
0061 print("test workflow multiple work starts")
0062 list_params = get_params()
0063
0064 ret = test_func(list_params)
0065 print(ret)
0066 print("test workflow multiple work ends")
0067
0068
0069 def submit_workflow(wf):
0070 req_id = wf.submit()
0071 print("req id: %s" % req_id)
0072
0073
0074 def run_workflow_wrapper(wf):
0075 cmd = wf.get_runner()
0076 logging.info(f'To run workflow: {cmd}')
0077
0078 exit_code = run_process(cmd, wait=True)
0079 logging.info(f'Run workflow finished with exit code: {exit_code}')
0080 return exit_code
0081
0082
0083 def run_workflow_remote_wrapper(wf):
0084 cmd = wf.get_runner()
0085 logging.info('To run workflow: %s' % cmd)
0086
0087 work_dir = '/tmp/idds'
0088 shutil.rmtree(work_dir)
0089 os.makedirs(work_dir)
0090 os.chdir(work_dir)
0091 logging.info("current dir: %s" % os.getcwd())
0092
0093
0094
0095
0096 setup = wf.setup_source_files()
0097 logging.info("setup: %s" % setup)
0098
0099 exc_cmd = 'cd %s' % work_dir
0100 exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper"
0101 exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd
0102 logging.info("exc_cmd: %s" % exc_cmd)
0103 exit_code = run_process(exc_cmd, wait=True)
0104 logging.info(f'Run workflow finished with exit code: {exit_code}')
0105 return exit_code
0106
0107
0108 def test_create_archive_file(wf):
0109 archive_name = wf._context.get_archive_name()
0110 source_dir = wf._context._source_dir
0111 logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir))
0112 archive_file = create_archive_file('/tmp', archive_name, [source_dir])
0113 logging.info("created archive file: %s" % archive_file)
0114
0115
0116 if __name__ == '__main__':
0117 logging.info("start")
0118 os.chdir(os.path.dirname(os.path.realpath(__file__)))
0119
0120 wf = Workflow(func=test_workflow, service='idds')
0121
0122 wf.queue = 'BNL_OSG_2'
0123
0124 wf.cloud = 'US'
0125
0126 wf_json = json_dumps(wf)
0127
0128 wf_1 = json_loads(wf_json)
0129
0130
0131
0132
0133
0134 logging.info("prepare workflow")
0135 wf.prepare()
0136 logging.info("prepared workflow")
0137
0138 wf.submit()
0139
0140
0141
0142
0143
0144