370 lines
15 KiB
Python
370 lines
15 KiB
Python
"""
|
|
ScanLook Module Manager
|
|
Handles module discovery, installation, uninstallation, and activation
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import sqlite3
|
|
import importlib.util
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
|
|
|
|
MODULES_DIR = Path(__file__).parent / 'modules'
|
|
DB_PATH = Path(__file__).parent / 'database' / 'scanlook.db'
|
|
|
|
|
|
def get_db():
|
|
"""Get database connection (standalone, no Flask context needed)"""
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def query_db(query, args=(), one=False):
|
|
"""Query database and return results"""
|
|
conn = get_db()
|
|
cur = conn.execute(query, args)
|
|
rv = cur.fetchall()
|
|
conn.close()
|
|
return (rv[0] if rv else None) if one else rv
|
|
|
|
|
|
def execute_db(query, args=()):
|
|
"""Execute database command and return lastrowid"""
|
|
conn = get_db()
|
|
cur = conn.execute(query, args)
|
|
conn.commit()
|
|
last_id = cur.lastrowid
|
|
conn.close()
|
|
return last_id
|
|
|
|
|
|
class ModuleManager:
|
|
"""Manages ScanLook modules"""
|
|
|
|
def __init__(self):
|
|
self.modules_dir = MODULES_DIR
|
|
self._ensure_modules_table()
|
|
|
|
def _ensure_modules_table(self):
|
|
"""Ensure the module_registry table exists in the database"""
|
|
conn = get_db()
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS module_registry (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
module_key TEXT UNIQUE NOT NULL,
|
|
name TEXT NOT NULL,
|
|
version TEXT NOT NULL,
|
|
author TEXT,
|
|
description TEXT,
|
|
is_installed INTEGER DEFAULT 0,
|
|
is_active INTEGER DEFAULT 0,
|
|
installed_at TEXT,
|
|
config_json TEXT
|
|
)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def scan_available_modules(self) -> List[Dict]:
|
|
"""
|
|
Scan the /modules directory for available modules.
|
|
Returns list of module info dicts from manifest.json files.
|
|
"""
|
|
available = []
|
|
|
|
if not self.modules_dir.exists():
|
|
return available
|
|
|
|
for item in self.modules_dir.iterdir():
|
|
if not item.is_dir():
|
|
continue
|
|
|
|
manifest_path = item / 'manifest.json'
|
|
if not manifest_path.exists():
|
|
continue
|
|
|
|
try:
|
|
with open(manifest_path, 'r') as f:
|
|
manifest = json.load(f)
|
|
|
|
# Validate required fields
|
|
required = ['module_key', 'name', 'version', 'author', 'description']
|
|
if not all(field in manifest for field in required):
|
|
print(f"⚠️ Invalid manifest in {item.name}: missing required fields")
|
|
continue
|
|
|
|
# Check installation status from database
|
|
db_module = query_db(
|
|
'SELECT is_installed, is_active FROM module_registry WHERE module_key = ?',
|
|
[manifest['module_key']],
|
|
one=True
|
|
)
|
|
|
|
manifest['is_installed'] = db_module['is_installed'] if db_module else False
|
|
manifest['is_active'] = db_module['is_active'] if db_module else False
|
|
manifest['path'] = str(item)
|
|
|
|
available.append(manifest)
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"⚠️ Invalid JSON in {manifest_path}: {e}")
|
|
continue
|
|
except Exception as e:
|
|
print(f"⚠️ Error reading manifest from {item.name}: {e}")
|
|
continue
|
|
|
|
return sorted(available, key=lambda x: x['name'])
|
|
|
|
def get_module_by_key(self, module_key: str) -> Optional[Dict]:
|
|
"""Get module info by module_key"""
|
|
modules = self.scan_available_modules()
|
|
for module in modules:
|
|
if module['module_key'] == module_key:
|
|
return module
|
|
return None
|
|
|
|
def install_module(self, module_key: str) -> Dict:
|
|
"""
|
|
Install a module:
|
|
1. Load manifest
|
|
2. Run migrations (create tables)
|
|
3. Register in database
|
|
4. Set is_installed=1, is_active=1
|
|
|
|
Returns: {'success': bool, 'message': str}
|
|
"""
|
|
try:
|
|
# Get module info
|
|
module = self.get_module_by_key(module_key)
|
|
if not module:
|
|
return {'success': False, 'message': f'Module {module_key} not found'}
|
|
|
|
# Check if already installed
|
|
if module['is_installed']:
|
|
return {'success': False, 'message': f'Module {module_key} is already installed'}
|
|
|
|
# Load module's migrations
|
|
migrations_path = Path(module['path']) / 'migrations.py'
|
|
if not migrations_path.exists():
|
|
return {'success': False, 'message': 'Module is missing migrations.py'}
|
|
|
|
# Import migrations module
|
|
spec = importlib.util.spec_from_file_location(f"{module_key}_migrations", migrations_path)
|
|
migrations_module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(migrations_module)
|
|
|
|
# Run schema installation
|
|
print(f"\n📦 Installing module: {module['name']}")
|
|
|
|
conn = get_db()
|
|
|
|
# Execute schema SQL
|
|
if hasattr(migrations_module, 'get_schema'):
|
|
schema_sql = migrations_module.get_schema()
|
|
conn.executescript(schema_sql)
|
|
print(f" ✅ Database schema created")
|
|
|
|
# Run module-specific migrations
|
|
if hasattr(migrations_module, 'get_migrations'):
|
|
migrations = migrations_module.get_migrations()
|
|
for version, name, func in migrations:
|
|
print(f" Running migration {version}: {name}")
|
|
func(conn)
|
|
|
|
conn.commit()
|
|
|
|
# Register module in database
|
|
existing = query_db('SELECT id FROM module_registry WHERE module_key = ?', [module_key], one=True)
|
|
|
|
if existing:
|
|
execute_db('''
|
|
UPDATE module_registry
|
|
SET name = ?, version = ?, author = ?, description = ?,
|
|
is_installed = 1, is_active = 1, installed_at = CURRENT_TIMESTAMP
|
|
WHERE module_key = ?
|
|
''', [module['name'], module['version'], module['author'],
|
|
module['description'], module_key])
|
|
else:
|
|
execute_db('''
|
|
INSERT INTO module_registry (module_key, name, version, author, description,
|
|
is_installed, is_active, installed_at)
|
|
VALUES (?, ?, ?, ?, ?, 1, 1, CURRENT_TIMESTAMP)
|
|
''', [module_key, module['name'], module['version'],
|
|
module['author'], module['description']])
|
|
|
|
# Also register in old Modules table for compatibility
|
|
old_module = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
|
|
if not old_module:
|
|
execute_db('''
|
|
INSERT INTO Modules (module_name, module_key, description, icon, is_active)
|
|
VALUES (?, ?, ?, ?, 1)
|
|
''', [module['name'], module_key, module['description'], module.get('icon', '')])
|
|
|
|
|
|
# Also register in old Modules table for compatibility
|
|
old_module = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
|
|
if not old_module:
|
|
execute_db('''
|
|
INSERT INTO Modules (module_name, module_key, description, is_active)
|
|
VALUES (?, ?, ?, 1)
|
|
''', [module['name'], module_key, module['description']])
|
|
|
|
conn.close()
|
|
|
|
print(f"✅ Module {module['name']} installed successfully")
|
|
return {'success': True, 'message': f'Module {module["name"]} installed successfully'}
|
|
|
|
except Exception as e:
|
|
print(f"❌ Installation failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {'success': False, 'message': f'Installation failed: {str(e)}'}
|
|
|
|
|
|
|
|
def uninstall_module(self, module_key: str, drop_tables: bool = True) -> Dict:
|
|
"""
|
|
Uninstall a module:
|
|
1. Set is_installed=0, is_active=0 in database
|
|
2. Optionally drop all module tables
|
|
3. Remove from old Modules table
|
|
|
|
Returns: {'success': bool, 'message': str}
|
|
"""
|
|
try:
|
|
module = self.get_module_by_key(module_key)
|
|
if not module:
|
|
return {'success': False, 'message': f'Module {module_key} not found'}
|
|
|
|
if not module['is_installed']:
|
|
return {'success': False, 'message': f'Module {module_key} is not installed'}
|
|
|
|
print(f"\n🗑️ Uninstalling module: {module['name']}")
|
|
|
|
conn = get_db()
|
|
|
|
# Drop tables if requested
|
|
if drop_tables:
|
|
print(f" Dropping database tables...")
|
|
# Load migrations to get table names
|
|
migrations_path = Path(module['path']) / 'migrations.py'
|
|
if migrations_path.exists():
|
|
spec = importlib.util.spec_from_file_location(f"{module_key}_migrations", migrations_path)
|
|
migrations_module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(migrations_module)
|
|
|
|
# Get schema and extract table names
|
|
if hasattr(migrations_module, 'get_schema'):
|
|
schema = migrations_module.get_schema()
|
|
# Simple regex to find CREATE TABLE statements
|
|
import re
|
|
tables = re.findall(r'CREATE TABLE IF NOT EXISTS (\w+)', schema)
|
|
|
|
for table in tables:
|
|
try:
|
|
conn.execute(f'DROP TABLE IF EXISTS {table}')
|
|
print(f" Dropped table: {table}")
|
|
except Exception as e:
|
|
print(f" Warning: Could not drop {table}: {e}")
|
|
|
|
# Update module_registry table
|
|
execute_db('''
|
|
UPDATE module_registry
|
|
SET is_installed = 0, is_active = 0
|
|
WHERE module_key = ?
|
|
''', [module_key])
|
|
|
|
# Remove from old Modules table
|
|
execute_db('DELETE FROM Modules WHERE module_key = ?', [module_key])
|
|
|
|
# Remove user module assignments
|
|
old_module_id = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
|
|
if old_module_id:
|
|
execute_db('DELETE FROM UserModules WHERE module_id = ?', [old_module_id['module_id']])
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ Module {module['name']} uninstalled successfully")
|
|
return {'success': True, 'message': f'Module {module["name"]} uninstalled successfully'}
|
|
|
|
except Exception as e:
|
|
print(f"❌ Uninstallation failed: {e}")
|
|
return {'success': False, 'message': f'Uninstallation failed: {str(e)}'}
|
|
|
|
def activate_module(self, module_key: str) -> Dict:
|
|
"""Activate an installed module"""
|
|
module = self.get_module_by_key(module_key)
|
|
if not module:
|
|
return {'success': False, 'message': f'Module {module_key} not found'}
|
|
|
|
if not module['is_installed']:
|
|
return {'success': False, 'message': 'Module must be installed first'}
|
|
|
|
execute_db('UPDATE module_registry SET is_active = 1 WHERE module_key = ?', [module_key])
|
|
execute_db('UPDATE Modules SET is_active = 1 WHERE module_key = ?', [module_key])
|
|
|
|
return {'success': True, 'message': f'Module {module["name"]} activated'}
|
|
|
|
def deactivate_module(self, module_key: str) -> Dict:
|
|
"""Deactivate a module (keeps it installed)"""
|
|
module = self.get_module_by_key(module_key)
|
|
if not module:
|
|
return {'success': False, 'message': f'Module {module_key} not found'}
|
|
|
|
execute_db('UPDATE module_registry SET is_active = 0 WHERE module_key = ?', [module_key])
|
|
execute_db('UPDATE Modules SET is_active = 0 WHERE module_key = ?', [module_key])
|
|
|
|
return {'success': True, 'message': f'Module {module["name"]} deactivated'}
|
|
|
|
def load_active_modules(self, app):
|
|
"""
|
|
Load all active modules and register their blueprints with Flask app.
|
|
Called during app startup.
|
|
"""
|
|
modules = self.scan_available_modules()
|
|
active_modules = [m for m in modules if m['is_installed'] and m['is_active']]
|
|
|
|
print(f"\n🔌 Loading {len(active_modules)} active module(s)...")
|
|
|
|
for module in active_modules:
|
|
try:
|
|
# Import module's __init__.py
|
|
init_path = Path(module['path']) / '__init__.py'
|
|
if not init_path.exists():
|
|
print(f" ⚠️ {module['name']}: Missing __init__.py")
|
|
continue
|
|
|
|
spec = importlib.util.spec_from_file_location(
|
|
f"modules.{module['module_key']}",
|
|
init_path
|
|
)
|
|
module_package = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module_package)
|
|
|
|
# Get blueprint from create_blueprint()
|
|
if hasattr(module_package, 'create_blueprint'):
|
|
blueprint = module_package.create_blueprint()
|
|
app.register_blueprint(blueprint)
|
|
print(f" ✅ {module['name']} loaded at {module.get('routes_prefix', '/unknown')}")
|
|
else:
|
|
print(f" ⚠️ {module['name']}: Missing create_blueprint() function")
|
|
|
|
except Exception as e:
|
|
print(f" ❌ Failed to load {module['name']}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
print("✅ Module loading complete\n")
|
|
|
|
|
|
# Global instance
|
|
manager = ModuleManager()
|
|
|
|
|
|
def get_module_manager() -> ModuleManager:
|
|
"""Get the global module manager instance"""
|
|
return manager |