Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:21

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019
0010 
0011 
0012 """
0013 performance test to insert contents.
0014 """
0015 import datetime
0016 import json
0017 import time
0018 import threading
0019 import cx_Oracle
0020 
0021 from uuid import uuid4 as uuid
0022 
0023 from idds.common.config import config_get
0024 from idds.common.constants import (TransformType, TransformStatus, CollectionType,
0025                                    CollectionRelationType, CollectionStatus,
0026                                    ContentType, ContentStatus)
0027 from idds.orm.transforms import add_transform
0028 from idds.orm.collections import add_collection
0029 # from idds.core.contents import add_content
0030 
0031 
0032 def add_content(coll_id, scope, name, min_id, max_id, content_type=ContentType.File, status=ContentStatus.New,
0033                 bytes=0, md5=None, adler32=None, processing_id=None, storage_id=None, retries=0,
0034                 path=None, expired_at=None, collcontent_metadata=None, connection=None):
0035     insert_coll_sql = """insert into atlas_idds.collections_content(coll_id, scope, name, min_id, max_id, content_type,
0036                                                                    status, bytes, md5, adler32, processing_id,
0037                                                                    storage_id, retries, path, created_at, updated_at,
0038                                                                    expired_at, collcontent_metadata)
0039                          values(:coll_id, :scope, :name, :min_id, :max_id, :content_type, :status, :bytes,
0040                                 :md5, :adler32, :processing_id, :storage_id, :retries, :path, :created_at, :updated_at,
0041                                 :expired_at, :collcontent_metadata)
0042                       """
0043     if isinstance(content_type, ContentType):
0044         content_type = content_type.value
0045     if isinstance(status, ContentStatus):
0046         status = status.value
0047     if collcontent_metadata:
0048         collcontent_metadata = json.dumps(collcontent_metadata)
0049 
0050     cursor = connection.cursor()
0051     cursor.execute(insert_coll_sql, {'coll_id': coll_id, 'scope': scope, 'name': name, 'min_id': min_id, 'max_id': max_id,
0052                                      'content_type': content_type, 'status': status, 'bytes': bytes, 'md5': md5,
0053                                      'adler32': adler32, 'processing_id': processing_id, 'storage_id': storage_id,
0054                                      'retries': retries, 'path': path, 'created_at': datetime.datetime.utcnow(),
0055                                      'updated_at': datetime.datetime.utcnow(), 'expired_at': expired_at,
0056                                      'collcontent_metadata': collcontent_metadata})
0057     cursor.close()
0058 
0059 
0060 def get_transform_prop():
0061     trans_properties = {
0062         'transform_type': TransformType.EventStreaming,
0063         'transform_tag': 's3128',
0064         'priority': 0,
0065         'status': TransformStatus.New,
0066         'retries': 0,
0067         'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0068         'transform_metadata': {'input': {'coll_id': 123},
0069                                'output': {'coll_id': 456},
0070                                'log': {'coll_id': 789}}
0071     }
0072     return trans_properties
0073 
0074 
0075 def get_collection_prop():
0076     coll_properties = {
0077         'scope': 'test_scope',
0078         'name': 'test_name_%s' % str(uuid()),
0079         'coll_type': CollectionType.Dataset,
0080         'request_id': None,
0081         'transform_id': None,
0082         'relation_type': CollectionRelationType.Input,
0083         'coll_size': 0,
0084         'status': CollectionStatus.New,
0085         'total_files': 0,
0086         'retries': 0,
0087         'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0088         'coll_metadata': {'ddm_status': 'closed'}
0089     }
0090     return coll_properties
0091 
0092 
0093 def get_content_prop():
0094     content_properties = {
0095         'coll_id': None,
0096         'scope': 'test_scope',
0097         'name': 'test_file_name_%s' % str(uuid()),
0098         'min_id': 0,
0099         'max_id': 100,
0100         'content_type': ContentType.File,
0101         'status': ContentStatus.New,
0102         'bytes': 1,
0103         'md5': None,
0104         'adler32': None,
0105         'processing_id': None,
0106         'storage_id': None,
0107         'retries': 0,
0108         'path': None,
0109         'expired_at': datetime.datetime.utcnow().replace(microsecond=0),
0110         'collcontent_metadata': {'id': 123}
0111     }
0112     return content_properties
0113 
0114 
0115 def test_insert_contents(coll_id, num_contents=1, db_pool=None):
0116     connection = db_pool.acquire()
0117     for i in range(num_contents):
0118         content_properties = get_content_prop()
0119         content_properties['coll_id'] = coll_id
0120         add_content(**content_properties, connection=connection)
0121     connection.commit()
0122     db_pool.release(connection)
0123 
0124 
0125 def test_thread(num_contents_per_thread, num_contents_per_session, coll_id, db_pool):
0126     for i in range(num_contents_per_thread // num_contents_per_session):
0127         test_insert_contents(coll_id, num_contents_per_session, db_pool)
0128 
0129 
0130 def get_session_pool():
0131     sql_connection = config_get('database', 'default')
0132     sql_connection = sql_connection.replace("oracle://", "")
0133     user_pass, tns = sql_connection.split('@')
0134     user, passwd = user_pass.split(':')
0135     db_pool = cx_Oracle.SessionPool(user, passwd, tns, min=12, max=20, increment=1)
0136     return db_pool
0137 
0138 
0139 def test(num_threads=1, total_contents=1, num_colls_per_session=1):
0140     db_pool = get_session_pool()
0141 
0142     trans_properties = get_transform_prop()
0143     trans_id = add_transform(**trans_properties)
0144     coll_properties = get_collection_prop()
0145     coll_properties['transform_id'] = trans_id
0146     coll_id = add_collection(**coll_properties)
0147 
0148     time_start = time.time()
0149     threads = [threading.Thread(target=test_thread, args=(total_contents // num_threads, num_colls_per_session, coll_id, db_pool)) for i in range(num_threads)]
0150     [thread.start() for thread in threads]
0151     while len(threads) > 0:
0152         left_threads = []
0153         for thread in threads:
0154             if thread.is_alive():
0155                 left_threads.append(thread)
0156         time.sleep(0.1)
0157         threads = left_threads
0158     time_end = time.time()
0159     print("num_threads=%s, total_contents=%s, num_colls_per_session=%s, time used: %s" % (num_threads, total_contents, num_colls_per_session, time_end - time_start))
0160 
0161 
0162 if __name__ == '__main__':
0163     test(num_threads=1, total_contents=1, num_colls_per_session=1)
0164     test(num_threads=1, total_contents=10000, num_colls_per_session=100)
0165     test(num_threads=1, total_contents=100000, num_colls_per_session=100)
0166     test(num_threads=1, total_contents=1000000, num_colls_per_session=100)
0167     test(num_threads=10, total_contents=10000, num_colls_per_session=100)
0168     test(num_threads=10, total_contents=100000, num_colls_per_session=100)
0169     test(num_threads=10, total_contents=1000000, num_colls_per_session=100)