File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Test workflow.
0014 """
0015
0016 import inspect
0017 import logging
0018 import os
0019 import shutil
0020 import sys
0021
0022
0023 from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file
0024
0025 from idds.iworkflow.workflow import Workflow, workflow
0026 from idds.iworkflow.work import work
0027
0028
0029 setup_logging(__name__)
0030
0031
0032 @work
0033 def test_func(name):
0034 print('test_func starts')
0035 print(name)
0036 print('test_func ends')
0037 return 'test result: %s' % name
0038
0039
0040 def test_func1(name):
0041 print('test_func1 starts')
0042 print(name)
0043 print('test_func1 ends')
0044 return 'test result: %s' % name
0045
0046
0047
0048 @workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2')
0049 def test_workflow():
0050 print("test workflow starts")
0051 ret = test_func(name='idds')
0052 print(ret)
0053 print("test workflow ends")
0054
0055
0056 @work
0057 def get_params():
0058 list_params = [i for i in range(10)]
0059 return list_params
0060
0061
0062 def test_workflow_mulitple_work():
0063 print("test workflow multiple work starts")
0064 list_params = get_params()
0065
0066 ret = test_func(list_params)
0067 print(ret)
0068 print("test workflow multiple work ends")
0069
0070
0071 def submit_workflow(wf):
0072 req_id = wf.submit()
0073 print("req id: %s" % req_id)
0074
0075
0076 def run_workflow_wrapper(wf):
0077 cmd = wf.get_runner()
0078 logging.info(f'To run workflow: {cmd}')
0079
0080 exit_code = run_process(cmd, wait=True)
0081 logging.info(f'Run workflow finished with exit code: {exit_code}')
0082 return exit_code
0083
0084
0085 def run_workflow_remote_wrapper(wf):
0086 cmd = wf.get_runner()
0087 logging.info('To run workflow: %s' % cmd)
0088
0089 work_dir = '/tmp/idds'
0090 shutil.rmtree(work_dir)
0091 os.makedirs(work_dir)
0092 os.chdir(work_dir)
0093 logging.info("current dir: %s" % os.getcwd())
0094
0095
0096
0097
0098 setup = wf.setup_source_files()
0099 logging.info("setup: %s" % setup)
0100
0101 exc_cmd = 'cd %s' % work_dir
0102 exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper"
0103 exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd
0104 logging.info("exc_cmd: %s" % exc_cmd)
0105 exit_code = run_process(exc_cmd, wait=True)
0106 logging.info(f'Run workflow finished with exit code: {exit_code}')
0107 return exit_code
0108
0109
0110 def test_create_archive_file(wf):
0111 archive_name = wf._context.get_archive_name()
0112 source_dir = wf._context._source_dir
0113 logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir))
0114 archive_file = create_archive_file('/tmp', archive_name, [source_dir])
0115 logging.info("created archive file: %s" % archive_file)
0116
0117
0118 if __name__ == '__main__':
0119 logging.info("start")
0120 os.chdir(os.path.dirname(os.path.realpath(__file__)))
0121 ret = test_workflow()
0122 print('result: %s' % str(ret))