Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 
0012 """
0013 Test workflow.
0014 """
0015 
0016 import inspect     # noqa F401
0017 import logging
0018 import os          # noqa F401
0019 import shutil      # noqa F401
0020 import sys         # noqa F401
0021 
0022 # from nose.tools import assert_equal
0023 from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file
0024 
0025 from idds.iworkflow.workflow import Workflow       # workflow
0026 from idds.iworkflow.work import work
0027 
0028 
0029 setup_logging(__name__)
0030 
0031 
0032 @work
0033 def test_func(name, name1=None):
0034     print('test_func starts')
0035     print(name)
0036     print(name1)
0037     print('test_func ends')
0038     return 'test result: %s, %s' % (name, name1)
0039 
0040 
0041 @work(map_results=True)
0042 def test_func1(name, name1=None):
0043     print('test_func1 starts')
0044     print(name)
0045     print(name1)
0046     print('test_func1 ends')
0047     return 'test result1: %s, %s' % (name, name1)
0048 
0049 
0050 def test_workflow():
0051     print("test workflow starts")
0052     multi_jobs_kwargs_list = [{'name': 1, 'name1': 2},
0053                               {'name': 3, 'name1': 4}]
0054     ret = test_func(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list)
0055     print(ret)
0056 
0057     ret = test_func1(name='idds', multi_jobs_kwargs_list=multi_jobs_kwargs_list)
0058     print(ret)
0059     print("test workflow ends")
0060 
0061 
0062 @work
0063 def get_params():
0064     list_params = [i for i in range(10)]
0065     return list_params
0066 
0067 
0068 def test_workflow_mulitple_work():
0069     print("test workflow multiple work starts")
0070     list_params = get_params()
0071 
0072     ret = test_func(list_params)
0073     print(ret)
0074     print("test workflow multiple work ends")
0075 
0076 
0077 def submit_workflow(wf):
0078     req_id = wf.submit()
0079     print("req id: %s" % req_id)
0080 
0081 
0082 def run_workflow_wrapper(wf):
0083     cmd = wf.get_runner()
0084     logging.info(f'To run workflow: {cmd}')
0085 
0086     exit_code = run_process(cmd, wait=True)
0087     logging.info(f'Run workflow finished with exit code: {exit_code}')
0088     return exit_code
0089 
0090 
0091 def run_workflow_remote_wrapper(wf):
0092     cmd = wf.get_runner()
0093     logging.info('To run workflow: %s' % cmd)
0094 
0095     work_dir = '/tmp/idds'
0096     shutil.rmtree(work_dir)
0097     os.makedirs(work_dir)
0098     os.chdir(work_dir)
0099     logging.info("current dir: %s" % os.getcwd())
0100 
0101     # print(dir(wf))
0102     # print(inspect.getmodule(wf))
0103     # print(inspect.getfile(wf))
0104     setup = wf.setup_source_files()
0105     logging.info("setup: %s" % setup)
0106 
0107     exc_cmd = 'cd %s' % work_dir
0108     exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper"
0109     exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd
0110     logging.info("exc_cmd: %s" % exc_cmd)
0111     exit_code = run_process(exc_cmd, wait=True)
0112     logging.info(f'Run workflow finished with exit code: {exit_code}')
0113     return exit_code
0114 
0115 
0116 def test_create_archive_file(wf):
0117     archive_name = wf._context.get_archive_name()
0118     source_dir = wf._context._source_dir
0119     logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir))
0120     archive_file = create_archive_file('/tmp', archive_name, [source_dir])
0121     logging.info("created archive file: %s" % archive_file)
0122 
0123 
0124 if __name__ == '__main__':
0125     logging.info("start")
0126     os.chdir(os.path.dirname(os.path.realpath(__file__)))
0127     # wf = Workflow(func=test_workflow, service='idds', distributed=False)
0128     wf = Workflow(func=test_workflow, service='idds')
0129 
0130     # wf.queue = 'BNL_OSG_2'
0131     wf.queue = 'FUNCX_TEST'
0132     wf.cloud = 'US'
0133 
0134     wf_json = json_dumps(wf)
0135     # print(wf_json)
0136     wf_1 = json_loads(wf_json)
0137 
0138     # test_create_archive_file(wf)
0139 
0140     # sys.exit(0)
0141 
0142     logging.info("prepare workflow")
0143     wf.prepare()
0144     logging.info("prepared workflow")
0145 
0146     wf.submit()
0147 
0148     # logging.info("run_workflow_wrapper")
0149     # run_workflow_wrapper(wf)
0150 
0151     # logging.info("run_workflow_remote_wrapper")
0152     # run_workflow_remote_wrapper(wf)