Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2024
0010 
0011 
0012 """
0013 Test workflow.
0014 """
0015 
0016 import inspect     # noqa F401
0017 import logging
0018 import os          # noqa F401
0019 import shutil      # noqa F401
0020 import sys         # noqa F401
0021 
0022 # from nose.tools import assert_equal
0023 from idds.common.utils import setup_logging, run_process, json_dumps, json_loads, create_archive_file    # noqa F401
0024 
0025 from idds.iworkflow.workflow import Workflow, workflow       # workflow    # noqa F401
0026 from idds.iworkflow.work import work
0027 
0028 
0029 setup_logging(__name__)
0030 
0031 
0032 @work
0033 def test_func(name):
0034     print('test_func starts')
0035     print(name)
0036     print('test_func ends')
0037     return 'test result: %s' % name
0038 
0039 
0040 def test_func1(name):
0041     print('test_func1 starts')
0042     print(name)
0043     print('test_func1 ends')
0044     return 'test result: %s' % name
0045 
0046 
0047 # @workflow(service='idds', local=True, cloud='US', queue='FUNCX_TEST')   # queue = 'BNL_OSG_2'
0048 @workflow(service='panda', local=True, cloud='US', queue='BNL_OSG_2')
0049 def test_workflow():
0050     print("test workflow starts")
0051     ret = test_func(name='idds')
0052     print(ret)
0053     print("test workflow ends")
0054 
0055 
0056 @work
0057 def get_params():
0058     list_params = [i for i in range(10)]
0059     return list_params
0060 
0061 
0062 def test_workflow_mulitple_work():
0063     print("test workflow multiple work starts")
0064     list_params = get_params()
0065 
0066     ret = test_func(list_params)
0067     print(ret)
0068     print("test workflow multiple work ends")
0069 
0070 
0071 def submit_workflow(wf):
0072     req_id = wf.submit()
0073     print("req id: %s" % req_id)
0074 
0075 
0076 def run_workflow_wrapper(wf):
0077     cmd = wf.get_runner()
0078     logging.info(f'To run workflow: {cmd}')
0079 
0080     exit_code = run_process(cmd, wait=True)
0081     logging.info(f'Run workflow finished with exit code: {exit_code}')
0082     return exit_code
0083 
0084 
0085 def run_workflow_remote_wrapper(wf):
0086     cmd = wf.get_runner()
0087     logging.info('To run workflow: %s' % cmd)
0088 
0089     work_dir = '/tmp/idds'
0090     shutil.rmtree(work_dir)
0091     os.makedirs(work_dir)
0092     os.chdir(work_dir)
0093     logging.info("current dir: %s" % os.getcwd())
0094 
0095     # print(dir(wf))
0096     # print(inspect.getmodule(wf))
0097     # print(inspect.getfile(wf))
0098     setup = wf.setup_source_files()
0099     logging.info("setup: %s" % setup)
0100 
0101     exc_cmd = 'cd %s' % work_dir
0102     exc_cmd += "; wget https://wguan-wisc.web.cern.ch/wguan-wisc/run_workflow_wrapper"
0103     exc_cmd += "; chmod +x run_workflow_wrapper; bash run_workflow_wrapper %s" % cmd
0104     logging.info("exc_cmd: %s" % exc_cmd)
0105     exit_code = run_process(exc_cmd, wait=True)
0106     logging.info(f'Run workflow finished with exit code: {exit_code}')
0107     return exit_code
0108 
0109 
0110 def test_create_archive_file(wf):
0111     archive_name = wf._context.get_archive_name()
0112     source_dir = wf._context._source_dir
0113     logging.info("archive_name :%s, source dir: %s" % (archive_name, source_dir))
0114     archive_file = create_archive_file('/tmp', archive_name, [source_dir])
0115     logging.info("created archive file: %s" % archive_file)
0116 
0117 
0118 if __name__ == '__main__':
0119     logging.info("start")
0120     os.chdir(os.path.dirname(os.path.realpath(__file__)))
0121     ret = test_workflow()
0122     print('result: %s' % str(ret))