Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 
0012 """
0013 Test workflow.
0014 """
0015 
0016 import inspect     # noqa F401
0017 import logging
0018 import os          # noqa F401
0019 import shutil      # noqa F401
0020 import sys         # noqa F401
0021 
0022 # from nose.tools import assert_equal
0023 from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file
0024 
0025 from idds.iworkflow.workflow import Workflow       # workflow
0026 from idds.iworkflow.work import work
0027 
0028 
0029 setup_logging(__name__)
0030 
0031 
0032 @work
0033 def test_func(name):
0034     print('test_func starts')
0035     print(name)
0036     print('test_func ends')
0037     return 'test result: %s' % name
0038 
0039 
0040 def test_func1(name):
0041     print('test_func1 starts')
0042     print(name)
0043     print('test_func1 ends')
0044     return 'test result: %s' % name
0045 
0046 
0047 def test_workflow():
0048     print("test workflow starts")
0049     ret = test_func(name='idds')
0050     print(ret)
0051     print("test workflow ends")
0052 
0053 
0054 @work
0055 def get_params():
0056     list_params = [i for i in range(10)]
0057     return list_params
0058 
0059 
0060 def test_workflow_mulitple_work():
0061     print("test workflow multiple work starts")
0062     list_params = get_params()
0063 
0064     ret = test_func(list_params)
0065     print(ret)
0066     print("test workflow multiple work ends")
0067 
0068 
0069 def submit_workflow(wf):
0070     req_id = wf.submit()
0071     print("req id: %s" % req_id)
0072 
0073 
0074 def run_workflow_wrapper(wf):
0075     cmd = wf.get_runner()
0076     logging.info(f'To run workflow: {cmd}')
0077 
0078     exit_code = run_process(cmd, wait=True)
0079     logging.info(f'Run workflow finished with exit code: {exit_code}')
0080     return exit_code
0081 
0082 
0083 def run_workflow_remote_wrapper(wf):
0084     cmd = wf.get_runner()
0085     logging.info('To run workflow: %s' % cmd)
0086 
0087     work_dir = '/tmp/idds'
0088     shutil.rmtree(work_dir)
0089     os.makedirs(work_dir)
0090     os.chdir(work_dir)
0091     logging.info("current dir: %s" % os.getcwd())
0092 
0093     # print(dir(wf))
0094     # print(inspect.getmodule(wf))
0095     # print(inspect.getfile(wf))
0096     setup = wf.setup_source_files()
0097     logging.info("setup: %s" % setup)
0098 
0099     exc_cmd = 'cd %s' % work_dir
0100     exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper"
0101     exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd
0102     logging.info("exc_cmd: %s" % exc_cmd)
0103     exit_code = run_process(exc_cmd, wait=True)
0104     logging.info(f'Run workflow finished with exit code: {exit_code}')
0105     return exit_code
0106 
0107 
0108 def test_create_archive_file(wf):
0109     archive_name = wf._context.get_archive_name()
0110     source_dir = wf._context._source_dir
0111     logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir))
0112     archive_file = create_archive_file('/tmp', archive_name, [source_dir])
0113     logging.info("created archive file: %s" % archive_file)
0114 
0115 
0116 if __name__ == '__main__':
0117     logging.info("start")
0118     os.chdir(os.path.dirname(os.path.realpath(__file__)))
0119     # wf = Workflow(func=test_workflow, service='idds', distributed=False)
0120     wf = Workflow(func=test_workflow, service='idds')
0121 
0122     wf.queue = 'BNL_OSG_2'
0123     # wf.queue = 'FUNCX_TEST'
0124     wf.cloud = 'US'
0125 
0126     wf_json = json_dumps(wf)
0127     # print(wf_json)
0128     wf_1 = json_loads(wf_json)
0129 
0130     # test_create_archive_file(wf)
0131 
0132     # sys.exit(0)
0133 
0134     logging.info("prepare workflow")
0135     wf.prepare()
0136     logging.info("prepared workflow")
0137 
0138     wf.submit()
0139 
0140     # logging.info("run_workflow_wrapper")
0141     # run_workflow_wrapper(wf)
0142 
0143     # logging.info("run_workflow_remote_wrapper")
0144     # run_workflow_remote_wrapper(wf)