Files
ScanLook/module_manager.py
Javier 363295762a feat: Implement Smart Router workflow with User Input and Duplicate Logic (v0.18.0)
Major update to the scanning engine to support "Pause & Resume" workflows.
The system can now halt execution to ask for user input (e.g. Weight) and resume
processing seamlessly.

Key Changes:
- Backend (Global Actions): Added `OPEN_FORM` action type to pause pipeline and request manual input.
- Backend (Routes): Updated `scan_lot` to handle `extra_data` payloads, allowing the pipeline to resume after user input.
- Backend (Logic): Implemented `confirm_duplicate` gatekeeper to handle "Warn vs Block" logic dynamically.
- Frontend (JS): Added `processSmartScan` to handle router signals (Open Modal, Warn Duplicate).
- Frontend (JS): Added `saveSmartScanData` to send original barcode + new form data back to the engine.
- UI: Fixed modal ID/Name conflicts (forcing use of `name` attribute for DB compatibility).
- UI: Restored missing "Cancel" button to Details Modal.
- Config: Added "User Input" rule type to the Rule Editor.

Ver: 0.18.0
2026-02-09 00:34:41 -06:00

388 lines
16 KiB
Python

"""
ScanLook Module Manager
Handles module discovery, installation, uninstallation, and activation
"""
import os
import json
import sqlite3
import importlib.util
from pathlib import Path
from typing import List, Dict, Optional
MODULES_DIR = Path(__file__).parent / 'modules'
DB_PATH = Path(__file__).parent / 'database' / 'scanlook.db'
def get_db():
"""Get database connection (standalone, no Flask context needed)"""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def query_db(query, args=(), one=False):
"""Query database and return results"""
conn = get_db()
cur = conn.execute(query, args)
rv = cur.fetchall()
conn.close()
return (rv[0] if rv else None) if one else rv
def execute_db(query, args=()):
"""Execute database command and return lastrowid"""
conn = get_db()
cur = conn.execute(query, args)
conn.commit()
last_id = cur.lastrowid
conn.close()
return last_id
class ModuleManager:
"""Manages ScanLook modules"""
def __init__(self):
self.modules_dir = MODULES_DIR
self._ensure_modules_table()
def _ensure_modules_table(self):
"""Ensure the module_registry table exists in the database"""
conn = get_db()
conn.execute('''
CREATE TABLE IF NOT EXISTS module_registry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
module_key TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
version TEXT NOT NULL,
author TEXT,
description TEXT,
is_installed INTEGER DEFAULT 0,
is_active INTEGER DEFAULT 0,
installed_at TEXT,
config_json TEXT
)
''')
conn.commit()
conn.close()
def scan_available_modules(self) -> List[Dict]:
"""
Scan the /modules directory for available modules.
Returns list of module info dicts from manifest.json files.
"""
available = []
if not self.modules_dir.exists():
return available
for item in self.modules_dir.iterdir():
if not item.is_dir():
continue
manifest_path = item / 'manifest.json'
if not manifest_path.exists():
continue
try:
with open(manifest_path, 'r') as f:
manifest = json.load(f)
# Validate required fields
required = ['module_key', 'name', 'version', 'author', 'description']
if not all(field in manifest for field in required):
print(f"⚠️ Invalid manifest in {item.name}: missing required fields")
continue
# Check installation status from database
db_module = query_db(
'SELECT is_installed, is_active FROM module_registry WHERE module_key = ?',
[manifest['module_key']],
one=True
)
manifest['is_installed'] = db_module['is_installed'] if db_module else False
manifest['is_active'] = db_module['is_active'] if db_module else False
manifest['path'] = str(item)
available.append(manifest)
except json.JSONDecodeError as e:
print(f"⚠️ Invalid JSON in {manifest_path}: {e}")
continue
except Exception as e:
print(f"⚠️ Error reading manifest from {item.name}: {e}")
continue
return sorted(available, key=lambda x: x['name'])
def get_module_by_key(self, module_key: str) -> Optional[Dict]:
"""Get module info by module_key"""
modules = self.scan_available_modules()
for module in modules:
if module['module_key'] == module_key:
return module
return None
def install_module(self, module_key: str) -> Dict:
"""
Install a module:
1. Load manifest
2. Run migrations (create tables)
3. Register in database
4. Set is_installed=1, is_active=1
Returns: {'success': bool, 'message': str}
"""
try:
# Get module info
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
# Check if already installed
if module['is_installed']:
return {'success': False, 'message': f'Module {module_key} is already installed'}
# Load module's migrations
migrations_path = Path(module['path']) / 'migrations.py'
if not migrations_path.exists():
return {'success': False, 'message': 'Module is missing migrations.py'}
# Import migrations module
spec = importlib.util.spec_from_file_location(f"{module_key}_migrations", migrations_path)
migrations_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migrations_module)
# Run schema installation
print(f"\n📦 Installing module: {module['name']}")
conn = get_db()
# Execute schema SQL
if hasattr(migrations_module, 'get_schema'):
schema_sql = migrations_module.get_schema()
conn.executescript(schema_sql)
print(f" ✅ Database schema created")
# Run module-specific migrations
if hasattr(migrations_module, 'get_migrations'):
migrations = migrations_module.get_migrations()
for version, name, func in migrations:
print(f" Running migration {version}: {name}")
func(conn)
conn.commit()
# Register module in database
existing = query_db('SELECT id FROM module_registry WHERE module_key = ?', [module_key], one=True)
if existing:
execute_db('''
UPDATE module_registry
SET name = ?, version = ?, author = ?, description = ?,
is_installed = 1, is_active = 1, installed_at = CURRENT_TIMESTAMP
WHERE module_key = ?
''', [module['name'], module['version'], module['author'],
module['description'], module_key])
else:
execute_db('''
INSERT INTO module_registry (module_key, name, version, author, description,
is_installed, is_active, installed_at)
VALUES (?, ?, ?, ?, ?, 1, 1, CURRENT_TIMESTAMP)
''', [module_key, module['name'], module['version'],
module['author'], module['description']])
# Also register in old Modules table for compatibility
old_module = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
if not old_module:
execute_db('''
INSERT INTO Modules (module_name, module_key, description, icon, is_active)
VALUES (?, ?, ?, ?, 1)
''', [module['name'], module_key, module['description'], module.get('icon', '')])
# Also register in old Modules table for compatibility
old_module = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
if not old_module:
execute_db('''
INSERT INTO Modules (module_name, module_key, description, is_active)
VALUES (?, ?, ?, 1)
''', [module['name'], module_key, module['description']])
conn.close()
print(f"✅ Module {module['name']} installed successfully")
return {'success': True, 'message': f'Module {module["name"]} installed successfully'}
except Exception as e:
print(f"❌ Installation failed: {e}")
import traceback
traceback.print_exc()
return {'success': False, 'message': f'Installation failed: {str(e)}'}
def uninstall_module(self, module_key: str, drop_tables: bool = True) -> Dict:
"""
Uninstall a module:
1. Set is_installed=0, is_active=0 in database
2. Optionally drop all module tables
3. Remove from old Modules table
Returns: {'success': bool, 'message': str}
"""
try:
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
if not module['is_installed']:
return {'success': False, 'message': f'Module {module_key} is not installed'}
print(f"\n🗑️ Uninstalling module: {module['name']}")
conn = get_db()
# Drop tables if requested
if drop_tables:
print(f" Dropping database tables...")
# Load migrations to get table names
migrations_path = Path(module['path']) / 'migrations.py'
if migrations_path.exists():
spec = importlib.util.spec_from_file_location(f"{module_key}_migrations", migrations_path)
migrations_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(migrations_module)
# Get schema and extract table names
if hasattr(migrations_module, 'get_schema'):
schema = migrations_module.get_schema()
# Simple regex to find CREATE TABLE statements
import re
tables = re.findall(r'CREATE TABLE IF NOT EXISTS (\w+)', schema)
for table in tables:
try:
conn.execute(f'DROP TABLE IF EXISTS {table}')
print(f" Dropped table: {table}")
except Exception as e:
print(f" Warning: Could not drop {table}: {e}")
# Update module_registry table
execute_db('''
UPDATE module_registry
SET is_installed = 0, is_active = 0
WHERE module_key = ?
''', [module_key])
# Remove from old Modules table
execute_db('DELETE FROM Modules WHERE module_key = ?', [module_key])
# Remove user module assignments
old_module_id = query_db('SELECT module_id FROM Modules WHERE module_key = ?', [module_key], one=True)
if old_module_id:
execute_db('DELETE FROM UserModules WHERE module_id = ?', [old_module_id['module_id']])
conn.commit()
conn.close()
print(f"✅ Module {module['name']} uninstalled successfully")
return {'success': True, 'message': f'Module {module["name"]} uninstalled successfully'}
except Exception as e:
print(f"❌ Uninstallation failed: {e}")
return {'success': False, 'message': f'Uninstallation failed: {str(e)}'}
def activate_module(self, module_key: str) -> Dict:
"""Activate an installed module"""
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
if not module['is_installed']:
return {'success': False, 'message': 'Module must be installed first'}
execute_db('UPDATE module_registry SET is_active = 1 WHERE module_key = ?', [module_key])
execute_db('UPDATE Modules SET is_active = 1 WHERE module_key = ?', [module_key])
return {'success': True, 'message': f'Module {module["name"]} activated'}
def deactivate_module(self, module_key: str) -> Dict:
"""Deactivate a module (keeps it installed)"""
module = self.get_module_by_key(module_key)
if not module:
return {'success': False, 'message': f'Module {module_key} not found'}
execute_db('UPDATE module_registry SET is_active = 0 WHERE module_key = ?', [module_key])
execute_db('UPDATE Modules SET is_active = 0 WHERE module_key = ?', [module_key])
return {'success': True, 'message': f'Module {module["name"]} deactivated'}
def load_active_modules(self, app):
"""
Load all active modules, run their migrations, and register blueprints.
Called during app startup.
"""
modules = self.scan_available_modules()
active_modules = [m for m in modules if m['is_installed'] and m['is_active']]
print(f"\n🔌 Loading {len(active_modules)} active module(s)...")
for module in active_modules:
try:
# --- NEW: Run Migrations on Startup ---
migrations_path = Path(module['path']) / 'migrations.py'
if migrations_path.exists():
# 1. Dynamically load the migrations.py file
spec_mig = importlib.util.spec_from_file_location(f"{module['module_key']}_mig", migrations_path)
mig_mod = importlib.util.module_from_spec(spec_mig)
spec_mig.loader.exec_module(mig_mod)
# 2. Run the migrations
if hasattr(mig_mod, 'get_migrations'):
conn = get_db()
for version, name, func in mig_mod.get_migrations():
# Your migrations are written safely (checking IF EXISTS),
# so running them on every boot is the correct Dev workflow.
func(conn)
conn.commit() # <--- CRITICAL: Saves the changes to the DB
conn.close()
# --------------------------------------
# Import module's __init__.py
init_path = Path(module['path']) / '__init__.py'
if not init_path.exists():
print(f" ⚠️ {module['name']}: Missing __init__.py")
continue
spec = importlib.util.spec_from_file_location(
f"modules.{module['module_key']}",
init_path
)
module_package = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module_package)
# Get blueprint from create_blueprint()
if hasattr(module_package, 'create_blueprint'):
blueprint = module_package.create_blueprint()
app.register_blueprint(blueprint)
print(f"{module['name']} loaded at {module.get('routes_prefix', '/unknown')}")
else:
print(f" ⚠️ {module['name']}: Missing create_blueprint() function")
except Exception as e:
print(f" ❌ Failed to load {module['name']}: {e}")
import traceback
traceback.print_exc()
print("✅ Module loading complete\n")
# Global instance
manager = ModuleManager()
def get_module_manager() -> ModuleManager:
"""Get the global module manager instance"""
return manager