File indexing completed on 2026-04-09 07:58:19
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 """
0013 Utils to create the database or destroy the database
0014 """
0015
0016 import io
0017 import os
0018 import traceback
0019
0020
0021 from alembic.config import Config
0022 from alembic import command
0023
0024 import sqlalchemy
0025
0026
0027 from sqlalchemy import inspect
0028 from sqlalchemy.dialects.postgresql.base import PGInspector
0029 from sqlalchemy.schema import CreateSchema, MetaData, Table, DropTable, ForeignKeyConstraint, DropConstraint
0030 from sqlalchemy.sql.ddl import DropSchema
0031
0032 from idds.common.config import config_has_option, config_get
0033 from idds.orm.base import session, models
0034
0035
0036 def build_database(echo=True, tests=False):
0037 """Build the database. """
0038 engine = session.get_engine(echo=echo)
0039
0040 if config_has_option('database', 'schema'):
0041 schema = config_get('database', 'schema')
0042 if schema:
0043 print('Schema set in config, trying to create schema:', schema)
0044 try:
0045 with engine.connect() as conn:
0046 with conn.begin():
0047 conn.execute(CreateSchema(schema))
0048 except Exception as e:
0049 print('Cannot create schema, please validate manually if schema creation is needed, continuing:', e)
0050
0051
0052 models.register_models(engine)
0053
0054
0055 alembic_cfg_file = os.environ.get("ALEMBIC_CONFIG")
0056 alembic_cfg = Config(alembic_cfg_file)
0057
0058 output_buffer = io.StringIO()
0059 alembic_cfg1 = Config(alembic_cfg_file, stdout=output_buffer)
0060 current_version = command.current(alembic_cfg1)
0061 current_version = output_buffer.getvalue()
0062 print("current_version: " + current_version)
0063
0064 if current_version is None or len(current_version) == 0:
0065 print("no current version, stamp it")
0066 command.stamp(alembic_cfg, "head")
0067 command.upgrade(alembic_cfg, "head")
0068 else:
0069 print("has current version, try to upgrade")
0070 command.upgrade(alembic_cfg, "head")
0071
0072
0073 def destroy_database(echo=True):
0074 """Destroy the database"""
0075 engine = session.get_engine(echo=echo)
0076
0077 try:
0078 models.unregister_models(engine)
0079 except Exception as e:
0080 print('Cannot destroy schema -- assuming already gone, continuing:', e)
0081 print(traceback.format_exc())
0082
0083
0084 def destroy_everything(echo=True):
0085 """ Using metadata.reflect() to get all constraints and tables.
0086 metadata.drop_all() as it handles cyclical constraints between tables.
0087 Ref. http://www.sqlalchemy.org/trac/wiki/UsageRecipes/DropEverything
0088 """
0089 engine = session.get_engine(echo=echo)
0090
0091 try:
0092
0093
0094 with engine.begin() as conn:
0095
0096 inspector = inspect(conn)
0097
0098 for tname, fkcs in reversed(
0099 inspector.get_sorted_table_and_fkc_names(schema='*')):
0100 if tname:
0101 drop_table_stmt = DropTable(Table(tname, MetaData(), schema='*'))
0102 conn.execute(drop_table_stmt)
0103 elif fkcs:
0104 if not engine.dialect.supports_alter:
0105 continue
0106 for tname, fkc in fkcs:
0107 fk_constraint = ForeignKeyConstraint((), (), name=fkc)
0108 Table(tname, MetaData(), fk_constraint)
0109 drop_constraint_stmt = DropConstraint(fk_constraint)
0110 conn.execute(drop_constraint_stmt)
0111
0112 if config_has_option('database', 'schema'):
0113 schema = config_get('database', 'schema')
0114 if schema:
0115 conn.execute(DropSchema(schema, cascade=True))
0116
0117 if engine.dialect.name == 'postgresql':
0118 assert isinstance(inspector, PGInspector), 'expected a PGInspector'
0119 for enum in inspector.get_enums(schema='*'):
0120 sqlalchemy.Enum(**enum).drop(bind=conn)
0121
0122 except Exception as e:
0123 print('Cannot destroy db:', e)
0124 print(traceback.format_exc())
0125
0126
0127 def dump_schema():
0128 """ Creates a schema dump to a specific database. """
0129 engine = session.get_dump_engine()
0130 models.unregister_models(engine)
0131 print("\n")
0132 models.register_models(engine)
0133
0134
0135 def row2dict(row):
0136 """ Convert rows to dict. """
0137 row_dict = {}
0138 for col in row.keys():
0139 row_dict[str(col)] = row[col]
0140 return row_dict
0141
0142
0143 def rows2dict(rows):
0144 """ Convert rows to dict. """
0145 results = []
0146 for row in rows:
0147 row_dict = {}
0148 for col in row.keys():
0149 row_dict[str(col)] = row[col]
0150 results.append(row_dict)
0151 return results