Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019
0010 
0011 
0012 """
0013 performance test to insert contents.
0014 """
0015 import datetime
0016 import time
0017 import threading
0018 
0019 from uuid import uuid4 as uuid
0020 
0021 from idds.orm.base.session import transactional_session
0022 from idds.common.constants import (TransformType, TransformStatus, CollectionType,
0023                                    CollectionRelationType, CollectionStatus,
0024                                    ContentType, ContentStatus)
0025 from idds.orm.transforms import add_transform
0026 from idds.orm.collections import add_collection
0027 from idds.orm.contents import add_contents
0028 
0029 
0030 def get_transform_prop():
0031     trans_properties = {
0032         'transform_type': TransformType.EventStreaming,
0033         'transform_tag': 's3128',
0034         'priority': 0,
0035         'status': TransformStatus.New,
0036         'retries': 0,
0037         'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0038         'transform_metadata': {'input': {'coll_id': 123},
0039                                'output': {'coll_id': 456},
0040                                'log': {'coll_id': 789}}
0041     }
0042     return trans_properties
0043 
0044 
0045 def get_collection_prop():
0046     coll_properties = {
0047         'scope': 'test_scope',
0048         'name': 'test_name_%s' % str(uuid()),
0049         'coll_type': CollectionType.Dataset,
0050         'request_id': None,
0051         'transform_id': None,
0052         'relation_type': CollectionRelationType.Input,
0053         'coll_size': 0,
0054         'status': CollectionStatus.New,
0055         'total_files': 0,
0056         'retries': 0,
0057         'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0058         'coll_metadata': {'ddm_status': 'closed'}
0059     }
0060     return coll_properties
0061 
0062 
0063 def get_content_prop():
0064     content_properties = {
0065         'coll_id': None,
0066         'scope': 'test_scope',
0067         'name': 'test_file_name_%s' % str(uuid()),
0068         'min_id': 0,
0069         'max_id': 100,
0070         'content_type': ContentType.File,
0071         'status': ContentStatus.New,
0072         'bytes': 1,
0073         'md5': None,
0074         'adler32': None,
0075         'processing_id': None,
0076         'storage_id': None,
0077         'retries': 0,
0078         'path': None,
0079         'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0080         'collcontent_metadata': {'id': 123}
0081     }
0082     return content_properties
0083 
0084 
0085 @transactional_session
0086 def test_insert_contents(coll_id, num_contents=1, session=None):
0087     # print("test_insert_contents, num_contents: %s" % num_contents)
0088     list_contents = []
0089     for i in range(num_contents):
0090         content_properties = get_content_prop()
0091         content_properties['coll_id'] = coll_id
0092         list_contents.append(content_properties)
0093     add_contents(list_contents, bulk_size=num_contents, session=session)
0094 
0095 
0096 def test_thread(num_contents_per_thread, num_contents_per_session, coll_id):
0097     for i in range(num_contents_per_thread // num_contents_per_session):
0098         test_insert_contents(coll_id, num_contents_per_session)
0099 
0100 
0101 def test(num_threads=1, total_contents=1, num_colls_per_session=1):
0102     trans_properties = get_transform_prop()
0103     trans_id = add_transform(**trans_properties)
0104     coll_properties = get_collection_prop()
0105     coll_properties['transform_id'] = trans_id
0106     coll_id = add_collection(**coll_properties)
0107 
0108     time_start = time.time()
0109     threads = [threading.Thread(target=test_thread, args=(total_contents // num_threads, num_colls_per_session, coll_id)) for i in range(num_threads)]
0110     [thread.start() for thread in threads]
0111     while len(threads) > 0:
0112         left_threads = []
0113         for thread in threads:
0114             if thread.is_alive():
0115                 left_threads.append(thread)
0116         time.sleep(0.1)
0117         threads = left_threads
0118     time_end = time.time()
0119     print("num_threads=%s, total_contents=%s, num_colls_per_session=%s, time used: %s" % (num_threads, total_contents, num_colls_per_session, time_end - time_start))
0120 
0121 
0122 if __name__ == '__main__':
0123     test(num_threads=1, total_contents=10, num_colls_per_session=5)
0124     test(num_threads=1, total_contents=1, num_colls_per_session=1)
0125     test(num_threads=1, total_contents=10000, num_colls_per_session=1000)
0126     test(num_threads=1, total_contents=100000, num_colls_per_session=1000)
0127     test(num_threads=1, total_contents=1000000, num_colls_per_session=1000)
0128     test(num_threads=10, total_contents=10000, num_colls_per_session=1000)
0129     test(num_threads=10, total_contents=100000, num_colls_per_session=1000)
0130     test(num_threads=10, total_contents=1000000, num_colls_per_session=1000)
0131     test(num_threads=20, total_contents=10000, num_colls_per_session=500)
0132     test(num_threads=20, total_contents=100000, num_colls_per_session=500)
0133     test(num_threads=20, total_contents=1000000, num_colls_per_session=500)