feat: Implement modular plugin architecture

- Convert invcount to self-contained module
- Add Module Manager for install/uninstall
- Create module_registry database table
- Support hot-reloading of modules
- Move data imports into invcount module
- Update all templates and routes to new structure

Version bumped to 0.16.0
This commit is contained in:
Javier
2026-02-07 01:47:49 -06:00
parent 2a649fdbcc
commit 406219547d
35 changed files with 3887 additions and 492 deletions

359
module_manager.py Normal file
View File

@@ -0,0 +1,359 @@
"""
ScanLook Module Manager
Handles module discovery, installation, uninstallation, and activation
"""
import os
import json
import sqlite3
import importlib.util
from pathlib import Path
from typing import List, Dict, Optional
MODULES_DIR = Path(__file__).parent / 'modules'
DB_PATH = Path(__file__).parent / 'database' / 'scanlook.db'
def get_db():
"""Get database connection (standalone, no Flask context needed)"""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def query_db(query, args=(), one=False):
"""Query database and return results"""
conn = get_db()
cur = conn.execute(query, args)
rv = cur.fetchall()
conn.close()
return (rv[0] if rv else None) if one else rv
def execute_db(query, args=()):
"""Execute database command and return lastrowid"""
conn = get_db()
cur = conn.execute(query, args)
conn.commit()
last_id = cur.lastrowid
conn.close()
return last_id
class ModuleManager:
"""Manages ScanLook modules"""
def __init__(self):
self.modules_dir = MODULES_DIR
self._ensure_modules_table()
def _ensure_modules_table(self):
"""Ensure the module_registry table exists in the database"""
conn = get_db()
conn.execute('''
CREATE TABLE IF NOT EXISTS module_registry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_key TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
version TEXT NOT NULL,
author TEXT,
description TEXT,
is_installed INTEGER DEFAULT 0,
is_active INTEGER DEFAULT 0,
installed_at TEXT,
config_json TEXT
)
''')
conn.commit()
conn.close()
def scan_available_modules(self) -> List[Dict]:
"""
Scan the /modules directory for available modules.
Returns list of module info dicts from manifest.json files.
"""
available = []
if not self.modules_dir.exists():
return available
for item in self.modules_dir.iterdir():
if not item.is_dir():
continue
manifest_path = item / 'manifest.json'
if not manifest_path.exists():
continue
try:
with open(manifest_path, 'r') as f:
manifest = json.load(f)
# Validate required fields
required = ['module_key', 'name', 'version', 'author', 'description']
if not all(field in manifest for field in required):
print(f"⚠️ Invalid manifest in {item.name}: missing required fields")
continue
# Check installation status from database
db_module = query_db(
'SELECT is_installed, is_active FROM module_registry WHERE module_key = ?',
[manifest['module_key']],
one=True
)
manifest['is_installed'] = db_module['is_installed'] if db_module else False
manifest['is_active'] = db_module['is_active'] if db_module else False
manifest['path'] = str(item)
available.append(manifest)
except json.JSONDecodeError as e:
print(f"⚠️ Invalid JSON in {manifest_path}: {e}")
continue
except Exception as e:
print(f"⚠️ Error reading manifest from {item.name}: {e}")
continue
return sorted(available, key=lambda x: x['name'])
def get_module_by_key(self, module_key: str) -> Optional[Dict]:
"""Get module info by module_key"""
modules = self.scan_available_modules()
for module in modules:
if module['module_key'] == module_key:
return module
return None
def install_module(self, module_key: str) -> Dict:
"""
Install a module:
1. Load manifest
2. Run migrations (create tables)
3. Register in database
4. Set is_installed=1, is_active=1
Returns: {'success': bool, 'message': str}
"""
try:
# Get module info
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
# Check if already installed
if module['is_installed']:
return {'success': False, 'message': f'Module {module_key} is already installed'}
# Load module's migrations
migrations_path = Path(module['path']) / 'migrations.py'
if not migrations_path.exists():
return {'success': False, 'message': 'Module is missing migrations.py'}
# Import migrations module
spec = importlib.util.spec_from_file_location(f"{module_key}_migrations", migrations_path)
migrations_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migrations_module)
# Run schema installation
print(f"\n📦 Installing module: {module['name']}")
conn = get_db()
# Execute schema SQL
if hasattr(migrations_module, 'get_schema'):
schema_sql = migrations_module.get_schema()
conn.executescript(schema_sql)
print(f" ✅ Database schema created")
# Run module-specific migrations
if hasattr(migrations_module, 'get_migrations'):
migrations = migrations_module.get_migrations()
for version, name, func in migrations:
print(f" Running migration {version}: {name}")
func(conn)
conn.commit()
# Register module in database
existing = query_db('SELECT id FROM module_registry WHERE module_key = ?', [module_key], one=True)
if existing:
execute_db('''
UPDATE module_registry
SET name = ?, version = ?, author = ?, description = ?,
is_installed = 1, is_active = 1, installed_at = CURRENT_TIMESTAMP
WHERE module_key = ?
''', [module['name'], module['version'], module['author'],
module['description'], module_key])
else:
execute_db('''
INSERT INTO module_registry (module_key, name, version, author, description,
is_installed, is_active, installed_at)
VALUES (?, ?, ?, ?, ?, 1, 1, CURRENT_TIMESTAMP)
''', [module_key, module['name'], module['version'],
module['author'], module['description']])
# Also register in old Modules table for compatibility
old_module = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
if not old_module:
execute_db('''
INSERT INTO Modules (module_name, module_key, description, is_active)
VALUES (?, ?, ?, 1)
''', [module['name'], module_key, module['description']])
conn.close()
print(f"✅ Module {module['name']} installed successfully")
return {'success': True, 'message': f'Module {module["name"]} installed successfully'}
except Exception as e:
print(f"❌ Installation failed: {e}")
import traceback
traceback.print_exc()
return {'success': False, 'message': f'Installation failed: {str(e)}'}
def uninstall_module(self, module_key: str, drop_tables: bool = True) -> Dict:
"""
Uninstall a module:
1. Set is_installed=0, is_active=0 in database
2. Optionally drop all module tables
3. Remove from old Modules table
Returns: {'success': bool, 'message': str}
"""
try:
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
if not module['is_installed']:
return {'success': False, 'message': f'Module {module_key} is not installed'}
print(f"\n🗑️ Uninstalling module: {module['name']}")
conn = get_db()
# Drop tables if requested
if drop_tables:
print(f" Dropping database tables...")
# Load migrations to get table names
migrations_path = Path(module['path']) / 'migrations.py'
if migrations_path.exists():
spec = importlib.util.spec_from_file_location(f"{module_key}_migrations", migrations_path)
migrations_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migrations_module)
# Get schema and extract table names
if hasattr(migrations_module, 'get_schema'):
schema = migrations_module.get_schema()
# Simple regex to find CREATE TABLE statements
import re
tables = re.findall(r'CREATE TABLE IF NOT EXISTS (\w+)', schema)
for table in tables:
try:
conn.execute(f'DROP TABLE IF EXISTS {table}')
print(f" Dropped table: {table}")
except Exception as e:
print(f" Warning: Could not drop {table}: {e}")
# Update module_registry table
execute_db('''
UPDATE module_registry
SET is_installed = 0, is_active = 0
WHERE module_key = ?
''', [module_key])
# Remove from old Modules table
execute_db('DELETE FROM Modules WHERE module_key = ?', [module_key])
# Remove user module assignments
old_module_id = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
if old_module_id:
execute_db('DELETE FROM UserModules WHERE module_id = ?', [old_module_id['module_id']])
conn.commit()
conn.close()
print(f"✅ Module {module['name']} uninstalled successfully")
return {'success': True, 'message': f'Module {module["name"]} uninstalled successfully'}
except Exception as e:
print(f"❌ Uninstallation failed: {e}")
return {'success': False, 'message': f'Uninstallation failed: {str(e)}'}
def activate_module(self, module_key: str) -> Dict:
"""Activate an installed module"""
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
if not module['is_installed']:
return {'success': False, 'message': 'Module must be installed first'}
execute_db('UPDATE module_registry SET is_active = 1 WHERE module_key = ?', [module_key])
execute_db('UPDATE Modules SET is_active = 1 WHERE module_key = ?', [module_key])
return {'success': True, 'message': f'Module {module["name"]} activated'}
def deactivate_module(self, module_key: str) -> Dict:
"""Deactivate a module (keeps it installed)"""
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
execute_db('UPDATE module_registry SET is_active = 0 WHERE module_key = ?', [module_key])
execute_db('UPDATE Modules SET is_active = 0 WHERE module_key = ?', [module_key])
return {'success': True, 'message': f'Module {module["name"]} deactivated'}
def load_active_modules(self, app):
"""
Load all active modules and register their blueprints with Flask app.
Called during app startup.
"""
modules = self.scan_available_modules()
active_modules = [m for m in modules if m['is_installed'] and m['is_active']]
print(f"\n🔌 Loading {len(active_modules)} active module(s)...")
for module in active_modules:
try:
# Import module's __init__.py
init_path = Path(module['path']) / '__init__.py'
if not init_path.exists():
print(f" ⚠️ {module['name']}: Missing __init__.py")
continue
spec = importlib.util.spec_from_file_location(
f"modules.{module['module_key']}",
init_path
)
module_package = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module_package)
# Get blueprint from create_blueprint()
if hasattr(module_package, 'create_blueprint'):
blueprint = module_package.create_blueprint()
app.register_blueprint(blueprint)
print(f"{module['name']} loaded at {module.get('routes_prefix', '/unknown')}")
else:
print(f" ⚠️ {module['name']}: Missing create_blueprint() function")
except Exception as e:
print(f" ❌ Failed to load {module['name']}: {e}")
import traceback
traceback.print_exc()
print("✅ Module loading complete\n")
# Global instance
manager = ModuleManager()
def get_module_manager() -> ModuleManager:
"""Get the global module manager instance"""
return manager