auto-commit-service/src/auto_commit_service/database/migrate.py
2026-04-17 21:20:13 -07:00

90 lines
2.9 KiB
Python

"""Database migration utilities.
Handles schema migrations for the auto-commit service database.
"""
import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
logger = logging.getLogger(__name__)
async def migrate_schema(engine: AsyncEngine) -> None:
"""Apply schema migrations to bring database up to date.
Args:
engine: SQLAlchemy async engine
"""
async with engine.begin() as conn:
# Migration 001: Add monorepo tracking fields to commits table
await _add_monorepo_fields(conn)
# Migration 002: Add hostname tracking to commits, cycles, errors
await _add_hostname_field(conn)
logger.info("Schema migrations completed successfully")
async def _add_monorepo_fields(conn) -> None:
"""Add monorepo tracking fields to commits table.
Adds:
- is_monorepo_parent (BOOLEAN)
- submodule_count (INTEGER)
- submodule_metadata (JSON)
"""
# Check if columns already exist
result = await conn.execute(text("PRAGMA table_info(commits)"))
existing_columns = {row[1] for row in result}
migrations_needed = []
if "is_monorepo_parent" not in existing_columns:
migrations_needed.append(
"ALTER TABLE commits ADD COLUMN is_monorepo_parent BOOLEAN NOT NULL DEFAULT 0"
)
if "submodule_count" not in existing_columns:
migrations_needed.append(
"ALTER TABLE commits ADD COLUMN submodule_count INTEGER NOT NULL DEFAULT 0"
)
if "submodule_metadata" not in existing_columns:
migrations_needed.append(
"ALTER TABLE commits ADD COLUMN submodule_metadata JSON"
)
if migrations_needed:
logger.info("Adding monorepo tracking fields to commits table")
for migration in migrations_needed:
await conn.execute(text(migration))
logger.info(f"Applied: {migration}")
else:
logger.debug("Monorepo fields already exist in commits table")
async def _add_hostname_field(conn) -> None:
"""Add hostname field to commits, cycles, and errors tables.
Tracks which host produced each commit/cycle/error for multi-host setups.
"""
tables = ["commits", "cycles", "errors"]
for table in tables:
result = await conn.execute(text(f"PRAGMA table_info({table})"))
existing_columns = {row[1] for row in result}
if "hostname" not in existing_columns:
await conn.execute(
text(f"ALTER TABLE {table} ADD COLUMN hostname VARCHAR(255)")
)
logger.info(f"Added hostname column to {table}")
# Create index
await conn.execute(
text(f"CREATE INDEX IF NOT EXISTS idx_{table}_hostname ON {table}(hostname)")
)
logger.info(f"Created hostname index on {table}")
else:
logger.debug(f"hostname column already exists in {table}")