Files
ScanLook/app.py

414 lines
14 KiB
Python

"""
ScanLook - Modular Inventory Management System
Flask Application
Production-Ready Release with Module System
"""
from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, send_from_directory
from werkzeug.security import check_password_hash
import os
from datetime import timedelta
# Initialize App FIRST to prevent circular import errors
app = Flask(__name__)
# Now import your custom modules
from db import query_db, execute_db, get_db
from blueprints.users import users_bp
from utils import login_required
# Register Core Blueprints (non-modular)
app.register_blueprint(users_bp)
# V1.0: Use environment variable for production, fallback to demo key for development
app.secret_key = os.environ.get('SCANLOOK_SECRET_KEY', 'scanlook-demo-key-replace-for-production')
app.config['DATABASE'] = os.path.join(os.path.dirname(__file__), 'database', 'scanlook.db')
# Session timeout: 1 hour (auto-logout after idle period)
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=1)
# 1. Define the version
APP_VERSION = '0.17.3' # Bumped version for modular architecture
# 2. Inject it into all templates automatically
@app.context_processor
def inject_version():
return dict(version=APP_VERSION)
# Auto-initialize database if it doesn't exist
db_path = os.path.join(os.path.dirname(__file__), 'database', 'scanlook.db')
if not os.path.exists(db_path):
print("Database not found, initializing...")
from database.init_db import init_database, create_default_users
init_database()
create_default_users()
print("Database initialized!")
print("📦 Install modules from /admin/modules")
# Run migrations to apply any pending database changes
from migrations import run_migrations
run_migrations()
# Load and register active modules
from module_manager import get_module_manager
module_manager = get_module_manager()
module_manager.load_active_modules(app)
# ==================== ROUTES: AUTHENTICATION ====================
@app.route('/')
def index():
"""Landing page - redirect based on login status"""
if 'user_id' in session:
return redirect(url_for('home'))
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page"""
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
user = query_db('SELECT * FROM Users WHERE username = ? AND is_active = 1', [username], one=True)
if user and check_password_hash(user['password'], password):
session.permanent = True # Enable session timeout
session['user_id'] = user['user_id']
session['username'] = user['username']
session['full_name'] = user['full_name']
session['role'] = user['role']
flash(f'Welcome back, {user["full_name"]}!', 'success')
return redirect(url_for('home'))
else:
flash('Invalid username or password', 'danger')
return render_template('login.html')
@app.route('/logout')
def logout():
"""Logout"""
session.clear()
flash('You have been logged out', 'info')
return redirect(url_for('login'))
# ==================== ROUTES: HOME ====================
@app.route('/home')
@login_required
def home():
"""Module selection landing page"""
user_id = session.get('user_id')
# Get modules this user has access to
modules = query_db('''
SELECT m.module_id, m.module_name, m.module_key, m.description, m.icon
FROM Modules m
JOIN UserModules um ON m.module_id = um.module_id
WHERE um.user_id = ? AND m.is_active = 1
ORDER BY m.display_order
''', [user_id])
return render_template('home.html', modules=modules)
# ==================== ROUTES: ADMIN DASHBOARD ====================
@app.route('/admin')
@login_required
def admin_dashboard():
"""Admin dashboard - shows all available modules"""
role = session.get('role')
if role not in ['owner', 'admin']:
flash('Access denied. Admin role required.', 'danger')
return redirect(url_for('home'))
# Get modules this user has access to
user_id = session.get('user_id')
modules = query_db('''
SELECT m.module_id, m.module_name, m.module_key, m.description, m.icon
FROM Modules m
JOIN UserModules um ON m.module_id = um.module_id
WHERE um.user_id = ? AND m.is_active = 1
ORDER BY m.display_order
''', [user_id])
return render_template('admin_dashboard.html', modules=modules)
# ==================== MODULE MANAGER UI ====================
@app.route('/admin/modules')
@login_required
def module_manager_ui():
"""Module manager interface for admins"""
if session.get('role') not in ['owner', 'admin']:
flash('Access denied. Admin role required.', 'danger')
return redirect(url_for('home'))
modules = module_manager.scan_available_modules()
return render_template('module_manager.html', modules=modules)
@app.route('/admin/modules/<module_key>/install', methods=['POST'])
@login_required
def install_module(module_key):
"""Install a module"""
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Access denied'}), 403
result = module_manager.install_module(module_key)
if result['success']:
result['restart_required'] = True
return jsonify(result)
@app.route('/admin/modules/<module_key>/uninstall', methods=['POST'])
@login_required
def uninstall_module(module_key):
"""Uninstall a module"""
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Access denied'}), 403
# Check if user wants to keep data
keep_data = request.args.get('keep_data') == 'true'
drop_tables = not keep_data
result = module_manager.uninstall_module(module_key, drop_tables=drop_tables)
return jsonify(result)
@app.route('/admin/modules/<module_key>/activate', methods=['POST'])
@login_required
def activate_module(module_key):
"""Activate a module"""
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Access denied'}), 403
result = module_manager.activate_module(module_key)
return jsonify(result)
@app.route('/admin/modules/<module_key>/deactivate', methods=['POST'])
@login_required
def deactivate_module(module_key):
"""Deactivate a module"""
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Access denied'}), 403
result = module_manager.deactivate_module(module_key)
return jsonify(result)
@app.route('/admin/restart', methods=['POST'])
@login_required
def restart_server():
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Access denied'}), 403
try:
import signal
import os
import sys
# Check if running under Gunicorn
if 'gunicorn' in os.environ.get('SERVER_SOFTWARE', ''):
# Gunicorn: Send HUP to master for graceful reload
master_pid = os.getppid()
os.kill(master_pid, signal.SIGHUP)
return jsonify({'success': True, 'message': 'Server reloading...'})
else:
# Flask dev server: Restart process
def restart():
import time
time.sleep(0.5)
if os.name == 'nt': # Windows
os.execv(sys.executable, ['python'] + sys.argv)
else: # Linux/Mac
os.execv(sys.executable, [sys.executable] + sys.argv)
from threading import Thread
Thread(target=restart).start()
return jsonify({'success': True, 'message': 'Server restarting...'})
except Exception as e:
return jsonify({'success': False, 'message': f'Restart failed: {str(e)}'})
"""
Add this route to app.py
"""
@app.route('/admin/modules/upload', methods=['POST'])
@login_required
def upload_module():
"""Upload and extract a module package"""
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Access denied'}), 403
if 'module_file' not in request.files:
return jsonify({'success': False, 'message': 'No file uploaded'})
file = request.files['module_file']
if file.filename == '':
return jsonify({'success': False, 'message': 'No file selected'})
if not file.filename.endswith('.zip'):
return jsonify({'success': False, 'message': 'File must be a ZIP archive'})
try:
import zipfile
import tempfile
import shutil
from pathlib import Path
import json
# Create temp directory
with tempfile.TemporaryDirectory() as temp_dir:
# Save uploaded file
zip_path = os.path.join(temp_dir, 'module.zip')
file.save(zip_path)
# Extract zip
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Find the module folder (should contain manifest.json)
module_folder = None
manifest_path = None
# Check if manifest.json is at root of zip
if os.path.exists(os.path.join(temp_dir, 'manifest.json')):
module_folder = temp_dir
manifest_path = os.path.join(temp_dir, 'manifest.json')
else:
# Look for manifest.json in subdirectories
for item in os.listdir(temp_dir):
item_path = os.path.join(temp_dir, item)
if os.path.isdir(item_path):
potential_manifest = os.path.join(item_path, 'manifest.json')
if os.path.exists(potential_manifest):
module_folder = item_path
manifest_path = potential_manifest
break
if not manifest_path:
return jsonify({
'success': False,
'message': 'Invalid module package: manifest.json not found'
})
# Read and validate manifest
with open(manifest_path, 'r') as f:
manifest = json.load(f)
required_fields = ['module_key', 'name', 'version', 'author']
for field in required_fields:
if field not in manifest:
return jsonify({
'success': False,
'message': f'Invalid manifest.json: missing required field "{field}"'
})
module_key = manifest['module_key']
# Check if module already exists
modules_dir = os.path.join(os.path.dirname(__file__), 'modules')
target_path = os.path.join(modules_dir, module_key)
if os.path.exists(target_path):
return jsonify({
'success': False,
'message': f'Module "{module_key}" already exists. Please uninstall it first or use a different module key.'
})
# Required files check
required_files = ['manifest.json', '__init__.py']
for req_file in required_files:
if not os.path.exists(os.path.join(module_folder, req_file)):
return jsonify({
'success': False,
'message': f'Invalid module package: {req_file} not found'
})
# Copy module to /modules directory
shutil.copytree(module_folder, target_path)
print(f"✅ Module '{manifest['name']}' uploaded successfully to {target_path}")
return jsonify({
'success': True,
'message': f"Module '{manifest['name']}' uploaded successfully! Click Install to activate it."
})
except zipfile.BadZipFile:
return jsonify({'success': False, 'message': 'Invalid ZIP file'})
except json.JSONDecodeError:
return jsonify({'success': False, 'message': 'Invalid manifest.json: not valid JSON'})
except Exception as e:
print(f"❌ Module upload error: {e}")
import traceback
traceback.print_exc()
return jsonify({'success': False, 'message': f'Upload failed: {str(e)}'})
# ==================== PWA SUPPORT ROUTES ====================
@app.route('/manifest.json')
def serve_manifest():
"""Serve the PWA manifest file from the static directory"""
return send_from_directory('static', 'manifest.json')
@app.route('/sw.js')
def serve_sw():
"""Serve the Service Worker file from the static directory"""
return send_from_directory('static', 'sw.js')
# ==================== Temp delete me later ====================
@app.route('/whatami')
def whatami():
"""Temporary route to identify device user-agent"""
ua = request.headers.get('User-Agent', 'Unknown')
return f"""
<html>
<head><meta name="viewport" content="width=device-width, initial-scale=1"></head>
<body style="font-family: monospace; padding: 20px; word-wrap: break-word;">
<h1>Device Info</h1>
<h3>User-Agent:</h3>
<p style="background: #eee; padding: 10px;">{ua}</p>
<h3>Screen Size:</h3>
<p id="screen" style="background: #eee; padding: 10px;">Loading...</p>
<h3>Viewport Size:</h3>
<p id="viewport" style="background: #eee; padding: 10px;">Loading...</p>
<script>
document.getElementById('screen').textContent =
'Screen: ' + screen.width + ' x ' + screen.height + ' | Pixel Ratio: ' + window.devicePixelRatio;
document.getElementById('viewport').textContent =
'Viewport: ' + window.innerWidth + ' x ' + window.innerHeight;
</script>
</body>
</html>
"""
@app.route('/debug/routes')
@login_required
def list_routes():
"""Debug: List all registered routes"""
if session.get('role') not in ['owner', 'admin']:
return "Access denied", 403
routes = []
for rule in app.url_map.iter_rules():
routes.append(f"{rule.endpoint}: {rule.rule}")
return "<br>".join(sorted(routes))
# ==================== RUN APPLICATION ====================
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)