File indexing completed on 2026-04-09 07:58:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 performance test to insert contents.
0014 """
0015 import datetime
0016 import json
0017 import time
0018 import threading
0019 import cx_Oracle
0020
0021 from uuid import uuid4 as uuid
0022
0023 from idds.common.config import config_get
0024 from idds.common.constants import (TransformType, TransformStatus, CollectionType,
0025 CollectionRelationType, CollectionStatus,
0026 ContentType, ContentStatus)
0027 from idds.orm.transforms import add_transform
0028 from idds.orm.collections import add_collection
0029
0030
0031
0032 def add_content(coll_id, scope, name, min_id, max_id, content_type=ContentType.File, status=ContentStatus.New,
0033 bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0034 path=None, expired_at=None, collcontent_metadata=None, connection=None):
0035 insert_coll_sql = """insert into atlas_idds.collections_content(coll_id, scope, name, min_id, max_id, content_type,
0036 status, bytes, md5, adler32, processing_id,
0037 storage_id, retries, path, created_at, updated_at,
0038 expired_at, collcontent_metadata)
0039 values(:coll_id, :scope, :name, :min_id, :max_id, :content_type, :status, :bytes,
0040 :md5, :adler32, :processing_id, :storage_id, :retries, :path, :created_at, :updated_at,
0041 :expired_at, :collcontent_metadata)
0042 """
0043 if isinstance(content_type, ContentType):
0044 content_type = content_type.value
0045 if isinstance(status, ContentStatus):
0046 status = status.value
0047 if collcontent_metadata:
0048 collcontent_metadata = json.dumps(collcontent_metadata)
0049
0050 cursor = connection.cursor()
0051 cursor.execute(insert_coll_sql, {'coll_id': coll_id, 'scope': scope, 'name': name, 'min_id': min_id, 'max_id': max_id,
0052 'content_type': content_type, 'status': status, 'bytes': bytes, 'md5': md5,
0053 'adler32': adler32, 'processing_id': processing_id, 'storage_id': storage_id,
0054 'retries': retries, 'path': path, 'created_at': datetime.datetime.utcnow(),
0055 'updated_at': datetime.datetime.utcnow(), 'expired_at': expired_at,
0056 'collcontent_metadata': collcontent_metadata})
0057 cursor.close()
0058
0059
0060 def get_transform_prop():
0061 trans_properties = {
0062 'transform_type': TransformType.EventStreaming,
0063 'transform_tag': 's3128',
0064 'priority': 0,
0065 'status': TransformStatus.New,
0066 'retries': 0,
0067 'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0068 'transform_metadata': {'input': {'coll_id': 123},
0069 'output': {'coll_id': 456},
0070 'log': {'coll_id': 789}}
0071 }
0072 return trans_properties
0073
0074
0075 def get_collection_prop():
0076 coll_properties = {
0077 'scope': 'test_scope',
0078 'name': 'test_name_%s' % str(uuid()),
0079 'coll_type': CollectionType.Dataset,
0080 'request_id': None,
0081 'transform_id': None,
0082 'relation_type': CollectionRelationType.Input,
0083 'coll_size': 0,
0084 'status': CollectionStatus.New,
0085 'total_files': 0,
0086 'retries': 0,
0087 'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0088 'coll_metadata': {'ddm_status': 'closed'}
0089 }
0090 return coll_properties
0091
0092
0093 def get_content_prop():
0094 content_properties = {
0095 'coll_id': None,
0096 'scope': 'test_scope',
0097 'name': 'test_file_name_%s' % str(uuid()),
0098 'min_id': 0,
0099 'max_id': 100,
0100 'content_type': ContentType.File,
0101 'status': ContentStatus.New,
0102 'bytes': 1,
0103 'md5': None,
0104 'adler32': None,
0105 'processing_id': None,
0106 'storage_id': None,
0107 'retries': 0,
0108 'path': None,
0109 'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0110 'collcontent_metadata': {'id': 123}
0111 }
0112 return content_properties
0113
0114
0115 def test_insert_contents(coll_id, num_contents=1, db_pool=None):
0116 connection = db_pool.acquire()
0117 for i in range(num_contents):
0118 content_properties = get_content_prop()
0119 content_properties['coll_id'] = coll_id
0120 add_content(**content_properties, connection=connection)
0121 connection.commit()
0122 db_pool.release(connection)
0123
0124
0125 def test_thread(num_contents_per_thread, num_contents_per_session, coll_id, db_pool):
0126 for i in range(num_contents_per_thread // num_contents_per_session):
0127 test_insert_contents(coll_id, num_contents_per_session, db_pool)
0128
0129
0130 def get_session_pool():
0131 sql_connection = config_get('database', 'default')
0132 sql_connection = sql_connection.replace("oracle://", "")
0133 user_pass, tns = sql_connection.split('@')
0134 user, passwd = user_pass.split(':')
0135 db_pool = cx_Oracle.SessionPool(user, passwd, tns, min=12, max=20, increment=1)
0136 return db_pool
0137
0138
0139 def test(num_threads=1, total_contents=1, num_colls_per_session=1):
0140 db_pool = get_session_pool()
0141
0142 trans_properties = get_transform_prop()
0143 trans_id = add_transform(**trans_properties)
0144 coll_properties = get_collection_prop()
0145 coll_properties['transform_id'] = trans_id
0146 coll_id = add_collection(**coll_properties)
0147
0148 time_start = time.time()
0149 threads = [threading.Thread(target=test_thread, args=(total_contents // num_threads, num_colls_per_session, coll_id, db_pool)) for i in range(num_threads)]
0150 [thread.start() for thread in threads]
0151 while len(threads) > 0:
0152 left_threads = []
0153 for thread in threads:
0154 if thread.is_alive():
0155 left_threads.append(thread)
0156 time.sleep(0.1)
0157 threads = left_threads
0158 time_end = time.time()
0159 print("num_threads=%s, total_contents=%s, num_colls_per_session=%s, time used: %s" % (num_threads, total_contents, num_colls_per_session, time_end - time_start))
0160
0161
0162 if __name__ == '__main__':
0163 test(num_threads=1, total_contents=1, num_colls_per_session=1)
0164 test(num_threads=1, total_contents=10000, num_colls_per_session=100)
0165 test(num_threads=1, total_contents=100000, num_colls_per_session=100)
0166 test(num_threads=1, total_contents=1000000, num_colls_per_session=100)
0167 test(num_threads=10, total_contents=10000, num_colls_per_session=100)
0168 test(num_threads=10, total_contents=100000, num_colls_per_session=100)
0169 test(num_threads=10, total_contents=1000000, num_colls_per_session=100)