|
import asyncio |
|
from logging.config import fileConfig |
|
|
|
from alembic import context |
|
from sqlalchemy import engine_from_config, pool |
|
from sqlalchemy.ext.asyncio import AsyncEngine |
|
|
|
from app.core import config as app_config |
|
|
|
|
|
|
|
config = context.config |
|
|
|
|
|
|
|
fileConfig(config.config_file_name) |
|
|
|
|
|
|
|
|
|
|
|
from app.models.database.base import Base |
|
|
|
target_metadata = Base.metadata |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_database_uri(): |
|
return app_config.settings.SQLALCHEMY_DATABASE_URI |
|
|
|
|
|
def run_migrations_offline(): |
|
"""Run migrations in 'offline' mode. |
|
|
|
This configures the context with just a URL |
|
and not an Engine, though an Engine is acceptable |
|
here as well. By skipping the Engine creation |
|
we don't even need a DBAPI to be available. |
|
|
|
Calls to context.execute() here emit the given string to the |
|
script output. |
|
|
|
""" |
|
url = get_database_uri() |
|
context.configure( |
|
url=url, |
|
target_metadata=target_metadata, |
|
literal_binds=True, |
|
dialect_opts={"paramstyle": "named"}, |
|
compare_type=True, |
|
compare_server_default=True, |
|
) |
|
|
|
with context.begin_transaction(): |
|
context.run_migrations() |
|
|
|
|
|
def do_run_migrations(connection): |
|
context.configure( |
|
connection=connection, target_metadata=target_metadata, compare_type=True |
|
) |
|
|
|
with context.begin_transaction(): |
|
context.run_migrations() |
|
|
|
|
|
async def run_migrations_online(): |
|
"""Run migrations in 'online' mode. |
|
|
|
In this scenario we need to create an Engine |
|
and associate a connection with the context. |
|
|
|
""" |
|
configuration = config.get_section(config.config_ini_section) |
|
assert configuration |
|
configuration["sqlalchemy.url"] = get_database_uri() |
|
connectable = AsyncEngine( |
|
engine_from_config( |
|
configuration, |
|
prefix="sqlalchemy.", |
|
poolclass=pool.NullPool, |
|
future=True, |
|
) |
|
) |
|
async with connectable.connect() as connection: |
|
await connection.run_sync(do_run_migrations) |
|
|
|
|
|
if context.is_offline_mode(): |
|
run_migrations_offline() |
|
else: |
|
asyncio.run(run_migrations_online()) |