Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2021
0010 
0011 
0012 """
0013 performance test to insert contents.
0014 """
0015 import cx_Oracle
0016 
0017 
0018 from idds.common.config import config_get
0019 # from idds.core.contents import add_content
0020 
0021 
0022 def get_archive_sql(schema):
0023     sql = """
0024         BEGIN
0025             FOR i in (SELECT request_id, scope, name, requester, request_type, transform_tag,
0026                              workload_id, priority, status, substatus, locking, created_at,
0027                              updated_at, next_poll_at, accessed_at, expired_at, errors,
0028                              request_metadata, processing_metadata
0029                       FROM   {schema}.requests
0030                       WHERE  status in (3, 5, 9, 17) and created_at < sysdate - interval '3' month
0031                       order by request_id asc)
0032             LOOP
0033                 --- archive records
0034                 insert into {schema}.requests_archive(request_id, scope, name, requester,
0035                             request_type, transform_tag, workload_id, priority, status,
0036                             substatus, locking, created_at, updated_at, next_poll_at,
0037                             accessed_at, expired_at, errors, request_metadata,
0038                             processing_metadata)
0039                     values(i.request_id, i.scope, i.name, i.requester, i.request_type, i.transform_tag,
0040                            i.workload_id, i.priority, i.status, i.substatus, i.locking, i.created_at,
0041                            i.updated_at, i.next_poll_at, i.accessed_at, i.expired_at, i.errors,
0042                            i.request_metadata, i.processing_metadata);
0043 
0044                 insert into {schema}.transforms_archive(transform_id, request_id, workload_id,
0045                                                         transform_type, transform_tag, priority,
0046                                                         safe2get_output_from_input, status,
0047                                                         substatus, locking, retries, created_at,
0048                                                         updated_at, next_poll_at, started_at,
0049                                                         finished_at, expired_at, transform_metadata,
0050                                                         running_metadata)
0051                     select transform_id, request_id, workload_id, transform_type, transform_tag,
0052                            priority, safe2get_output_from_input, status, substatus, locking,
0053                            retries, created_at, updated_at, next_poll_at, started_at, finished_at,
0054                            expired_at, transform_metadata, running_metadata
0055                     from {schema}.transforms where request_id=i.request_id;
0056 
0057                 insert into {schema}.processings_archive(processing_id, transform_id, request_id,
0058                            workload_id, status, substatus, locking, submitter, submitted_id,
0059                            granularity, granularity_type, created_at, updated_at, next_poll_at,
0060                            submitted_at, finished_at, expired_at, processing_metadata,
0061                            running_metadata, output_metadata)
0062                     select processing_id, transform_id, request_id, workload_id, status, substatus,
0063                            locking, submitter, submitted_id, granularity, granularity_type,
0064                            created_at, updated_at, next_poll_at, submitted_at, finished_at, expired_at,
0065                            processing_metadata, running_metadata, output_metadata
0066                     from {schema}.processings where request_id=i.request_id;
0067 
0068                 insert into {schema}.collections_archive(coll_id, coll_type, transform_id, request_id,
0069                            workload_id, relation_type, scope, name, bytes, status, substatus, locking,
0070                            total_files, storage_id, new_files, processed_files, processing_files,
0071                            processing_id, retries, created_at, updated_at, next_poll_at, accessed_at,
0072                            expired_at, coll_metadata)
0073                     select coll_id, coll_type, transform_id, request_id, workload_id, relation_type,
0074                            scope, name, bytes, status, substatus, locking, total_files, storage_id,
0075                            new_files, processed_files, processing_files, processing_id, retries,
0076                            created_at, updated_at, next_poll_at, accessed_at, expired_at,
0077                            coll_metadata
0078                     from {schema}.collections where request_id=i.request_id;
0079 
0080                 insert into {schema}.contents_archive(content_id, transform_id, coll_id, request_id,
0081                            workload_id, map_id, scope, name, min_id, max_id, content_type,
0082                            content_relation_type, status, substatus, locking, bytes, md5, adler32,
0083                            processing_id, storage_id, retries, path, created_at, updated_at,
0084                            accessed_at, expired_at, content_metadata)
0085                     select content_id, transform_id, coll_id, request_id, workload_id, map_id,
0086                            scope, name, min_id, max_id, content_type, content_relation_type,
0087                            status, substatus, locking, bytes, md5, adler32, processing_id,
0088                            storage_id, retries, path, created_at, updated_at, accessed_at,
0089                            expired_at, content_metadata
0090                     from {schema}.contents where request_id=i.request_id;
0091 
0092                 insert into {schema}.messages_archive(msg_id, msg_type, status, substatus, locking,
0093                            source, destination, request_id, workload_id, transform_id, processing_id,
0094                            num_contents, created_at, updated_at, msg_content)
0095                     select msg_id, msg_type, status, substatus, locking, source, destination,
0096                            request_id, workload_id, transform_id, processing_id, num_contents,
0097                            created_at, updated_at, msg_content
0098                     from {schema}.messages where request_id=i.request_id;
0099 
0100 
0101                 -- clean records
0102                 delete from {schema}.messages where request_id = i.request_id;
0103                 delete from {schema}.contents where request_id = i.request_id;
0104                 delete from {schema}.collections where request_id = i.request_id;
0105                 delete from {schema}.processings where request_id = i.request_id;
0106                 delete from {schema}.transforms where request_id = i.request_id;
0107                 delete from {schema}.requests where request_id = i.request_id;
0108             END LOOP;
0109             COMMIT;
0110     END;
0111     """
0112     sql = sql.format(schema=schema)
0113     return sql
0114 
0115 
0116 def run_archive_sql(db_pool, schema):
0117     connection = db_pool.acquire()
0118 
0119     sql = get_archive_sql(schema)
0120     # print(sql)
0121     cursor = connection.cursor()
0122     cursor.execute(sql)
0123     cursor.close()
0124 
0125     connection.commit()
0126     db_pool.release(connection)
0127 
0128 
0129 def get_session_pool():
0130     sql_connection = config_get('database', 'default')
0131     sql_connection = sql_connection.replace("oracle://", "")
0132     user_pass, tns = sql_connection.split('@')
0133     user, passwd = user_pass.split(':')
0134     db_pool = cx_Oracle.SessionPool(user, passwd, tns, min=12, max=20, increment=1)
0135 
0136     schema = config_get('database', 'schema')
0137     return db_pool, schema
0138 
0139 
0140 def run_archive():
0141     pool, schema = get_session_pool()
0142     run_archive_sql(pool, schema)
0143 
0144 
0145 if __name__ == '__main__':
0146     run_archive()