File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 performance test to insert contents.
0014 """
0015 import datetime
0016 import time
0017 import threading
0018
0019 from uuid import uuid4 as uuid
0020
0021 from idds.orm.base.session import transactional_session
0022 from idds.common.constants import (TransformType, TransformStatus, CollectionType,
0023 CollectionRelationType, CollectionStatus,
0024 ContentType, ContentStatus)
0025 from idds.orm.transforms import add_transform
0026 from idds.orm.collections import add_collection
0027 from idds.orm.contents import add_contents
0028
0029
0030 def get_transform_prop():
0031 trans_properties = {
0032 'transform_type': TransformType.EventStreaming,
0033 'transform_tag': 's3128',
0034 'priority': 0,
0035 'status': TransformStatus.New,
0036 'retries': 0,
0037 'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0038 'transform_metadata': {'input': {'coll_id': 123},
0039 'output': {'coll_id': 456},
0040 'log': {'coll_id': 789}}
0041 }
0042 return trans_properties
0043
0044
0045 def get_collection_prop():
0046 coll_properties = {
0047 'scope': 'test_scope',
0048 'name': 'test_name_%s' % str(uuid()),
0049 'coll_type': CollectionType.Dataset,
0050 'request_id': None,
0051 'transform_id': None,
0052 'relation_type': CollectionRelationType.Input,
0053 'coll_size': 0,
0054 'status': CollectionStatus.New,
0055 'total_files': 0,
0056 'retries': 0,
0057 'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0058 'coll_metadata': {'ddm_status': 'closed'}
0059 }
0060 return coll_properties
0061
0062
0063 def get_content_prop():
0064 content_properties = {
0065 'coll_id': None,
0066 'scope': 'test_scope',
0067 'name': 'test_file_name_%s' % str(uuid()),
0068 'min_id': 0,
0069 'max_id': 100,
0070 'content_type': ContentType.File,
0071 'status': ContentStatus.New,
0072 'bytes': 1,
0073 'md5': None,
0074 'adler32': None,
0075 'processing_id': None,
0076 'storage_id': None,
0077 'retries': 0,
0078 'path': None,
0079 'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0080 'collcontent_metadata': {'id': 123}
0081 }
0082 return content_properties
0083
0084
0085 @transactional_session
0086 def test_insert_contents(coll_id, num_contents=1, session=None):
0087
0088 list_contents = []
0089 for i in range(num_contents):
0090 content_properties = get_content_prop()
0091 content_properties['coll_id'] = coll_id
0092 list_contents.append(content_properties)
0093 add_contents(list_contents, bulk_size=num_contents, session=session)
0094
0095
0096 def test_thread(num_contents_per_thread, num_contents_per_session, coll_id):
0097 for i in range(num_contents_per_thread // num_contents_per_session):
0098 test_insert_contents(coll_id, num_contents_per_session)
0099
0100
0101 def test(num_threads=1, total_contents=1, num_colls_per_session=1):
0102 trans_properties = get_transform_prop()
0103 trans_id = add_transform(**trans_properties)
0104 coll_properties = get_collection_prop()
0105 coll_properties['transform_id'] = trans_id
0106 coll_id = add_collection(**coll_properties)
0107
0108 time_start = time.time()
0109 threads = [threading.Thread(target=test_thread, args=(total_contents // num_threads, num_colls_per_session, coll_id)) for i in range(num_threads)]
0110 [thread.start() for thread in threads]
0111 while len(threads) > 0:
0112 left_threads = []
0113 for thread in threads:
0114 if thread.is_alive():
0115 left_threads.append(thread)
0116 time.sleep(0.1)
0117 threads = left_threads
0118 time_end = time.time()
0119 print("num_threads=%s, total_contents=%s, num_colls_per_session=%s, time used: %s" % (num_threads, total_contents, num_colls_per_session, time_end - time_start))
0120
0121
0122 if __name__ == '__main__':
0123 test(num_threads=1, total_contents=10, num_colls_per_session=5)
0124 test(num_threads=1, total_contents=1, num_colls_per_session=1)
0125 test(num_threads=1, total_contents=10000, num_colls_per_session=1000)
0126 test(num_threads=1, total_contents=100000, num_colls_per_session=1000)
0127 test(num_threads=1, total_contents=1000000, num_colls_per_session=1000)
0128 test(num_threads=10, total_contents=10000, num_colls_per_session=1000)
0129 test(num_threads=10, total_contents=100000, num_colls_per_session=1000)
0130 test(num_threads=10, total_contents=1000000, num_colls_per_session=1000)
0131 test(num_threads=20, total_contents=10000, num_colls_per_session=500)
0132 test(num_threads=20, total_contents=100000, num_colls_per_session=500)
0133 test(num_threads=20, total_contents=1000000, num_colls_per_session=500)