Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:19

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2023
0010 
0011 
0012 """
0013 Utils to create the database or destroy the database
0014 """
0015 
0016 import io
0017 import os
0018 import traceback
0019 # from typing import Union
0020 
0021 from alembic.config import Config
0022 from alembic import command
0023 
0024 import sqlalchemy
0025 # from sqlalchemy.engine import reflection
0026 # from sqlalchemy.engine import Inspector
0027 from sqlalchemy import inspect
0028 from sqlalchemy.dialects.postgresql.base import PGInspector
0029 from sqlalchemy.schema import CreateSchema, MetaData, Table, DropTable, ForeignKeyConstraint, DropConstraint
0030 from sqlalchemy.sql.ddl import DropSchema
0031 
0032 from idds.common.config import config_has_option, config_get
0033 from idds.orm.base import session, models
0034 
0035 
0036 def build_database(echo=True, tests=False):
0037     """Build the database. """
0038     engine = session.get_engine(echo=echo)
0039 
0040     if config_has_option('database', 'schema'):
0041         schema = config_get('database', 'schema')
0042         if schema:
0043             print('Schema set in config, trying to create schema:', schema)
0044             try:
0045                 with engine.connect() as conn:
0046                     with conn.begin():
0047                         conn.execute(CreateSchema(schema))
0048             except Exception as e:
0049                 print('Cannot create schema, please validate manually if schema creation is needed, continuing:', e)
0050                 # print(traceback.format_exc())
0051 
0052     models.register_models(engine)
0053 
0054     # record the head version of alembic
0055     alembic_cfg_file = os.environ.get("ALEMBIC_CONFIG")
0056     alembic_cfg = Config(alembic_cfg_file)
0057 
0058     output_buffer = io.StringIO()
0059     alembic_cfg1 = Config(alembic_cfg_file, stdout=output_buffer)
0060     current_version = command.current(alembic_cfg1)
0061     current_version = output_buffer.getvalue()
0062     print("current_version: " + current_version)
0063 
0064     if current_version is None or len(current_version) == 0:
0065         print("no current version, stamp it")
0066         command.stamp(alembic_cfg, "head")
0067         command.upgrade(alembic_cfg, "head")
0068     else:
0069         print("has current version, try to upgrade")
0070         command.upgrade(alembic_cfg, "head")
0071 
0072 
0073 def destroy_database(echo=True):
0074     """Destroy the database"""
0075     engine = session.get_engine(echo=echo)
0076 
0077     try:
0078         models.unregister_models(engine)
0079     except Exception as e:
0080         print('Cannot destroy schema -- assuming already gone, continuing:', e)
0081         print(traceback.format_exc())
0082 
0083 
0084 def destroy_everything(echo=True):
0085     """ Using metadata.reflect() to get all constraints and tables.
0086         metadata.drop_all() as it handles cyclical constraints between tables.
0087         Ref. http://www.sqlalchemy.org/trac/wiki/UsageRecipes/DropEverything
0088     """
0089     engine = session.get_engine(echo=echo)
0090 
0091     try:
0092         # the transaction only applies if the DB supports
0093         # transactional DDL, i.e. Postgresql, MS SQL Server
0094         with engine.begin() as conn:
0095 
0096             inspector = inspect(conn)
0097 
0098             for tname, fkcs in reversed(
0099                     inspector.get_sorted_table_and_fkc_names(schema='*')):
0100                 if tname:
0101                     drop_table_stmt = DropTable(Table(tname, MetaData(), schema='*'))
0102                     conn.execute(drop_table_stmt)
0103                 elif fkcs:
0104                     if not engine.dialect.supports_alter:
0105                         continue
0106                     for tname, fkc in fkcs:
0107                         fk_constraint = ForeignKeyConstraint((), (), name=fkc)
0108                         Table(tname, MetaData(), fk_constraint)
0109                         drop_constraint_stmt = DropConstraint(fk_constraint)
0110                         conn.execute(drop_constraint_stmt)
0111 
0112             if config_has_option('database', 'schema'):
0113                 schema = config_get('database', 'schema')
0114                 if schema:
0115                     conn.execute(DropSchema(schema, cascade=True))
0116 
0117             if engine.dialect.name == 'postgresql':
0118                 assert isinstance(inspector, PGInspector), 'expected a PGInspector'
0119                 for enum in inspector.get_enums(schema='*'):
0120                     sqlalchemy.Enum(**enum).drop(bind=conn)
0121 
0122     except Exception as e:
0123         print('Cannot destroy db:', e)
0124         print(traceback.format_exc())
0125 
0126 
0127 def dump_schema():
0128     """ Creates a schema dump to a specific database. """
0129     engine = session.get_dump_engine()
0130     models.unregister_models(engine)
0131     print("\n")
0132     models.register_models(engine)
0133 
0134 
0135 def row2dict(row):
0136     """ Convert rows to dict. """
0137     row_dict = {}
0138     for col in row.keys():
0139         row_dict[str(col)] = row[col]
0140     return row_dict
0141 
0142 
0143 def rows2dict(rows):
0144     """ Convert rows to dict. """
0145     results = []
0146     for row in rows:
0147         row_dict = {}
0148         for col in row.keys():
0149             row_dict[str(col)] = row[col]
0150         results.append(row_dict)
0151     return results