File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 performance test to insert contents.
0014 """
0015 import cx_Oracle
0016
0017
0018 from idds.common.config import config_get
0019
0020
0021
0022 def get_archive_sql(schema):
0023 sql = """
0024 BEGIN
0025 FOR i in (SELECT request_id, scope, name, requester, request_type, transform_tag,
0026 workload_id, priority, status, substatus, locking, created_at,
0027 updated_at, next_poll_at, accessed_at, expired_at, errors,
0028 request_metadata, processing_metadata
0029 FROM {schema}.requests
0030 WHERE status in (3, 5, 9, 17) and created_at < sysdate - interval '3' month
0031 order by request_id asc)
0032 LOOP
0033 --- archive records
0034 insert into {schema}.requests_archive(request_id, scope, name, requester,
0035 request_type, transform_tag, workload_id, priority, status,
0036 substatus, locking, created_at, updated_at, next_poll_at,
0037 accessed_at, expired_at, errors, request_metadata,
0038 processing_metadata)
0039 values(i.request_id, i.scope, i.name, i.requester, i.request_type, i.transform_tag,
0040 i.workload_id, i.priority, i.status, i.substatus, i.locking, i.created_at,
0041 i.updated_at, i.next_poll_at, i.accessed_at, i.expired_at, i.errors,
0042 i.request_metadata, i.processing_metadata);
0043
0044 insert into {schema}.transforms_archive(transform_id, request_id, workload_id,
0045 transform_type, transform_tag, priority,
0046 safe2get_output_from_input, status,
0047 substatus, locking, retries, created_at,
0048 updated_at, next_poll_at, started_at,
0049 finished_at, expired_at, transform_metadata,
0050 running_metadata)
0051 select transform_id, request_id, workload_id, transform_type, transform_tag,
0052 priority, safe2get_output_from_input, status, substatus, locking,
0053 retries, created_at, updated_at, next_poll_at, started_at, finished_at,
0054 expired_at, transform_metadata, running_metadata
0055 from {schema}.transforms where request_id=i.request_id;
0056
0057 insert into {schema}.processings_archive(processing_id, transform_id, request_id,
0058 workload_id, status, substatus, locking, submitter, submitted_id,
0059 granularity, granularity_type, created_at, updated_at, next_poll_at,
0060 submitted_at, finished_at, expired_at, processing_metadata,
0061 running_metadata, output_metadata)
0062 select processing_id, transform_id, request_id, workload_id, status, substatus,
0063 locking, submitter, submitted_id, granularity, granularity_type,
0064 created_at, updated_at, next_poll_at, submitted_at, finished_at, expired_at,
0065 processing_metadata, running_metadata, output_metadata
0066 from {schema}.processings where request_id=i.request_id;
0067
0068 insert into {schema}.collections_archive(coll_id, coll_type, transform_id, request_id,
0069 workload_id, relation_type, scope, name, bytes, status, substatus, locking,
0070 total_files, storage_id, new_files, processed_files, processing_files,
0071 processing_id, retries, created_at, updated_at, next_poll_at, accessed_at,
0072 expired_at, coll_metadata)
0073 select coll_id, coll_type, transform_id, request_id, workload_id, relation_type,
0074 scope, name, bytes, status, substatus, locking, total_files, storage_id,
0075 new_files, processed_files, processing_files, processing_id, retries,
0076 created_at, updated_at, next_poll_at, accessed_at, expired_at,
0077 coll_metadata
0078 from {schema}.collections where request_id=i.request_id;
0079
0080 insert into {schema}.contents_archive(content_id, transform_id, coll_id, request_id,
0081 workload_id, map_id, scope, name, min_id, max_id, content_type,
0082 content_relation_type, status, substatus, locking, bytes, md5, adler32,
0083 processing_id, storage_id, retries, path, created_at, updated_at,
0084 accessed_at, expired_at, content_metadata)
0085 select content_id, transform_id, coll_id, request_id, workload_id, map_id,
0086 scope, name, min_id, max_id, content_type, content_relation_type,
0087 status, substatus, locking, bytes, md5, adler32, processing_id,
0088 storage_id, retries, path, created_at, updated_at, accessed_at,
0089 expired_at, content_metadata
0090 from {schema}.contents where request_id=i.request_id;
0091
0092 insert into {schema}.messages_archive(msg_id, msg_type, status, substatus, locking,
0093 source, destination, request_id, workload_id, transform_id, processing_id,
0094 num_contents, created_at, updated_at, msg_content)
0095 select msg_id, msg_type, status, substatus, locking, source, destination,
0096 request_id, workload_id, transform_id, processing_id, num_contents,
0097 created_at, updated_at, msg_content
0098 from {schema}.messages where request_id=i.request_id;
0099
0100
0101 -- clean records
0102 delete from {schema}.messages where request_id = i.request_id;
0103 delete from {schema}.contents where request_id = i.request_id;
0104 delete from {schema}.collections where request_id = i.request_id;
0105 delete from {schema}.processings where request_id = i.request_id;
0106 delete from {schema}.transforms where request_id = i.request_id;
0107 delete from {schema}.requests where request_id = i.request_id;
0108 END LOOP;
0109 COMMIT;
0110 END;
0111 """
0112 sql = sql.format(schema=schema)
0113 return sql
0114
0115
0116 def run_archive_sql(db_pool, schema):
0117 connection = db_pool.acquire()
0118
0119 sql = get_archive_sql(schema)
0120
0121 cursor = connection.cursor()
0122 cursor.execute(sql)
0123 cursor.close()
0124
0125 connection.commit()
0126 db_pool.release(connection)
0127
0128
0129 def get_session_pool():
0130 sql_connection = config_get('database', 'default')
0131 sql_connection = sql_connection.replace("oracle://", "")
0132 user_pass, tns = sql_connection.split('@')
0133 user, passwd = user_pass.split(':')
0134 db_pool = cx_Oracle.SessionPool(user, passwd, tns, min=12, max=20, increment=1)
0135
0136 schema = config_get('database', 'schema')
0137 return db_pool, schema
0138
0139
0140 def run_archive():
0141 pool, schema = get_session_pool()
0142 run_archive_sql(pool, schema)
0143
0144
0145 if __name__ == '__main__':
0146 run_archive()