246 lines
7.7 KiB
Python
246 lines
7.7 KiB
Python
"""
|
|
ScanLook Database Migration System
|
|
|
|
Simple migration system that tracks and applies database changes.
|
|
Each migration has a version number and an up() function.
|
|
|
|
Usage:
|
|
from migrations import run_migrations
|
|
run_migrations() # Call on app startup
|
|
"""
|
|
|
|
import sqlite3
|
|
import os
|
|
|
|
DB_PATH = os.path.join(os.path.dirname(__file__), 'database', 'scanlook.db')
|
|
|
|
|
|
def get_db():
|
|
"""Get database connection"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_migrations_table():
|
|
"""Create the migrations tracking table if it doesn't exist"""
|
|
conn = get_db()
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def get_applied_migrations():
|
|
"""Get list of already-applied migration versions"""
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute('SELECT version FROM schema_migrations ORDER BY version').fetchall()
|
|
return [row['version'] for row in rows]
|
|
except:
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def record_migration(version, name):
|
|
"""Record that a migration was applied"""
|
|
conn = get_db()
|
|
conn.execute('INSERT INTO schema_migrations (version, name) VALUES (?, ?)', [version, name])
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def column_exists(table, column):
|
|
"""Check if a column exists in a table"""
|
|
conn = get_db()
|
|
cursor = conn.execute(f'PRAGMA table_info({table})')
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
conn.close()
|
|
return column in columns
|
|
|
|
|
|
def table_exists(table):
|
|
"""Check if a table exists"""
|
|
conn = get_db()
|
|
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", [table])
|
|
exists = cursor.fetchone() is not None
|
|
conn.close()
|
|
return exists
|
|
|
|
|
|
# ============================================
|
|
# MIGRATIONS
|
|
# ============================================
|
|
# Add new migrations to this list.
|
|
# Each migration is a tuple: (version, name, up_function)
|
|
#
|
|
# RULES:
|
|
# - Never modify an existing migration
|
|
# - Always add new migrations at the end with the next version number
|
|
# - Check if changes are needed before applying (idempotent)
|
|
# ============================================
|
|
|
|
def migration_001_add_modules_tables():
|
|
"""Add Modules and UserModules tables"""
|
|
conn = get_db()
|
|
|
|
if not table_exists('Modules'):
|
|
conn.execute('''
|
|
CREATE TABLE Modules (
|
|
module_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
module_name TEXT NOT NULL,
|
|
module_key TEXT UNIQUE NOT NULL,
|
|
description TEXT,
|
|
icon TEXT,
|
|
is_active INTEGER DEFAULT 1,
|
|
display_order INTEGER DEFAULT 0
|
|
)
|
|
''')
|
|
print(" Created Modules table")
|
|
|
|
if not table_exists('UserModules'):
|
|
conn.execute('''
|
|
CREATE TABLE UserModules (
|
|
user_module_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
module_id INTEGER NOT NULL,
|
|
granted_by INTEGER,
|
|
granted_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES Users(user_id),
|
|
FOREIGN KEY (module_id) REFERENCES Modules(module_id),
|
|
FOREIGN KEY (granted_by) REFERENCES Users(user_id),
|
|
UNIQUE(user_id, module_id)
|
|
)
|
|
''')
|
|
print(" Created UserModules table")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def migration_002_add_usermodules_granted_columns():
|
|
"""Add granted_by and granted_timestamp to UserModules if missing"""
|
|
conn = get_db()
|
|
|
|
if table_exists('UserModules'):
|
|
if not column_exists('UserModules', 'granted_by'):
|
|
conn.execute('ALTER TABLE UserModules ADD COLUMN granted_by INTEGER')
|
|
print(" Added granted_by column to UserModules")
|
|
|
|
if not column_exists('UserModules', 'granted_timestamp'):
|
|
conn.execute('ALTER TABLE UserModules ADD COLUMN granted_timestamp DATETIME')
|
|
print(" Added granted_timestamp column to UserModules")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def migration_003_add_default_modules():
|
|
"""Add default modules if they don't exist"""
|
|
conn = get_db()
|
|
|
|
# Check if modules exist
|
|
existing = conn.execute('SELECT COUNT(*) as cnt FROM Modules').fetchone()
|
|
|
|
if existing['cnt'] == 0:
|
|
conn.execute('''
|
|
INSERT INTO Modules (module_name, module_key, description, icon, is_active, display_order)
|
|
VALUES ('Inventory Counts', 'counting', 'Cycle counts and physical inventory', 'fa-clipboard-check', 1, 1)
|
|
''')
|
|
conn.execute('''
|
|
INSERT INTO Modules (module_name, module_key, description, icon, is_active, display_order)
|
|
VALUES ('Consumption Sheets', 'cons_sheets', 'Production consumption tracking', 'fa-clipboard-list', 1, 2)
|
|
''')
|
|
print(" Added default modules")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def migration_004_assign_modules_to_admins():
|
|
"""Auto-assign all modules to owner and admin users"""
|
|
conn = get_db()
|
|
|
|
# Get admin users
|
|
admins = conn.execute('SELECT user_id FROM Users WHERE role IN ("owner", "admin")').fetchall()
|
|
modules = conn.execute('SELECT module_id FROM Modules').fetchall()
|
|
|
|
for user in admins:
|
|
for module in modules:
|
|
try:
|
|
conn.execute('''
|
|
INSERT INTO UserModules (user_id, module_id)
|
|
VALUES (?, ?)
|
|
''', [user['user_id'], module['module_id']])
|
|
except sqlite3.IntegrityError:
|
|
pass # Already assigned
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(" Assigned modules to admin users")
|
|
|
|
|
|
def migration_005_add_cons_process_fields_duplicate_key():
|
|
"""Add is_duplicate_key column to cons_process_fields if missing"""
|
|
conn = get_db()
|
|
|
|
if table_exists('cons_process_fields'):
|
|
if not column_exists('cons_process_fields', 'is_duplicate_key'):
|
|
conn.execute('ALTER TABLE cons_process_fields ADD COLUMN is_duplicate_key INTEGER DEFAULT 0')
|
|
print(" Added is_duplicate_key column to cons_process_fields")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# List of all migrations in order
|
|
MIGRATIONS = [
|
|
(1, 'add_modules_tables', migration_001_add_modules_tables),
|
|
(2, 'add_usermodules_granted_columns', migration_002_add_usermodules_granted_columns),
|
|
(3, 'add_default_modules', migration_003_add_default_modules),
|
|
(4, 'assign_modules_to_admins', migration_004_assign_modules_to_admins),
|
|
(5, 'add_cons_process_fields_duplicate_key', migration_005_add_cons_process_fields_duplicate_key),
|
|
]
|
|
|
|
|
|
def run_migrations():
|
|
"""Run all pending migrations"""
|
|
print("🔄 Checking database migrations...")
|
|
|
|
# Make sure migrations table exists
|
|
init_migrations_table()
|
|
|
|
# Get already-applied migrations
|
|
applied = get_applied_migrations()
|
|
|
|
# Run pending migrations
|
|
pending = [(v, n, f) for v, n, f in MIGRATIONS if v not in applied]
|
|
|
|
if not pending:
|
|
print("✅ Database is up to date")
|
|
return
|
|
|
|
print(f"📦 Running {len(pending)} migration(s)...")
|
|
|
|
for version, name, func in pending:
|
|
print(f"\n Migration {version}: {name}")
|
|
try:
|
|
func()
|
|
record_migration(version, name)
|
|
print(f" ✅ Migration {version} complete")
|
|
except Exception as e:
|
|
print(f" ❌ Migration {version} failed: {e}")
|
|
raise
|
|
|
|
print("\n✅ All migrations complete")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
run_migrations()
|