V1.0.0.2 - Refactor: Moved user Management out of App.py and into users.py

This commit is contained in:
Javier
2026-01-23 01:06:30 -06:00
parent c747375f79
commit 518c9478dc
6 changed files with 237 additions and 275 deletions

Binary file not shown.

285
app.py
View File

@@ -4,19 +4,21 @@ Flask Application
Production-Ready Release Production-Ready Release
""" """
from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, send_from_directory from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, send_from_directory
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash
from functools import wraps
import sqlite3
import csv
import os import os
from datetime import datetime, timedelta from datetime import timedelta
from io import StringIO
from db import query_db, execute_db, get_db
from blueprints.data_imports import data_imports_bp
# Initialize App FIRST to prevent circular import errors
app = Flask(__name__) app = Flask(__name__)
# Now import your custom modules
from db import query_db, execute_db, get_db
from blueprints.data_imports import data_imports_bp
from blueprints.users import users_bp
from utils import login_required, role_required
app.register_blueprint(data_imports_bp) app.register_blueprint(data_imports_bp)
app.register_blueprint(users_bp)
# V1.0: Use environment variable for production, fallback to demo key for development # V1.0: Use environment variable for production, fallback to demo key for development
app.secret_key = os.environ.get('SCANLOOK_SECRET_KEY', 'scanlook-demo-key-replace-for-production') app.secret_key = os.environ.get('SCANLOOK_SECRET_KEY', 'scanlook-demo-key-replace-for-production')
@@ -26,39 +28,6 @@ app.config['DATABASE'] = os.path.join(os.path.dirname(__file__), 'database', 'sc
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=1) app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=1)
# ==================== AUTHENTICATION DECORATORS ====================
def login_required(f):
"""Require login for route"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Please log in to access this page', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def role_required(*roles):
"""Require specific role(s) for route"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Please log in to access this page', 'warning')
return redirect(url_for('login'))
user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True)
if not user or user['role'] not in roles:
flash('You do not have permission to access this page', 'danger')
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated_function
return decorator
# ==================== ROUTES: AUTHENTICATION ==================== # ==================== ROUTES: AUTHENTICATION ====================
@app.route('/') @app.route('/')
@@ -361,8 +330,6 @@ def get_status_details(session_id, status):
return jsonify({'success': False, 'message': f'Error: {str(e)}'}) return jsonify({'success': False, 'message': f'Error: {str(e)}'})
# ==================== ROUTES: COUNTING (STAFF) ==================== # ==================== ROUTES: COUNTING (STAFF) ====================
@app.route('/count/<int:session_id>') @app.route('/count/<int:session_id>')
@@ -566,45 +533,6 @@ def get_location_scans(location_count_id):
return jsonify({'success': False, 'message': str(e)}) return jsonify({'success': False, 'message': str(e)})
"""Start counting a location"""
location_code = request.form.get('location_code', '').strip().upper()
if not location_code:
return jsonify({'success': False, 'message': 'Location code required'})
# Check if location already being counted
existing = query_db('''
SELECT * FROM LocationCounts
WHERE session_id = ? AND location_name = ? AND status = 'in_progress'
''', [session_id, location_code], one=True)
if existing:
return jsonify({
'success': False,
'message': f'Location {location_code} is already being counted by another user'
})
# Check expected lots from MASTER baseline
expected_lots = query_db('''
SELECT COUNT(*) as count FROM BaselineInventory_Master
WHERE session_id = ? AND system_bin = ?
''', [session_id, location_code], one=True)['count']
# Create location count record
location_count_id = execute_db('''
INSERT INTO LocationCounts
(session_id, location_name, counted_by, status, expected_lots_master)
VALUES (?, ?, ?, 'in_progress', ?)
''', [session_id, location_code, session['user_id'], expected_lots])
return jsonify({
'success': True,
'location_count_id': location_count_id,
'redirect': url_for('count_location', session_id=session_id, location_count_id=location_count_id)
})
@app.route('/count/<int:session_id>/location/<int:location_count_id>') @app.route('/count/<int:session_id>/location/<int:location_count_id>')
@login_required @login_required
def count_location(session_id, location_count_id): def count_location(session_id, location_count_id):
@@ -1117,197 +1045,6 @@ def finish_location(session_id, location_count_id):
}) })
# ==================== ROUTES: USER MANAGEMENT ====================
@app.route('/settings/users')
@role_required('owner', 'admin')
def manage_users():
"""User management page"""
# Get all users
if session['role'] == 'owner':
# Owners can see everyone
users = query_db('SELECT * FROM Users ORDER BY role, full_name')
else:
# Admins can only see staff
users = query_db("SELECT * FROM Users WHERE role = 'staff' ORDER BY full_name")
return render_template('manage_users.html', users=users)
@app.route('/settings/users/add', methods=['POST'])
@role_required('owner', 'admin')
def add_user():
"""Add a new user"""
data = request.get_json()
username = data.get('username', '').strip()
password = data.get('password', '')
first_name = data.get('first_name', '').strip()
last_name = data.get('last_name', '').strip()
email = data.get('email', '').strip()
role = data.get('role', 'staff')
branch = data.get('branch', 'Main')
# Validation
if not username or not password or not first_name or not last_name:
return jsonify({'success': False, 'message': 'Username, password, first name, and last name are required'})
# Admins can't create admins or owners
if session['role'] == 'admin' and role in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied: Admins can only create Staff users'})
# Check if username exists
existing = query_db('SELECT user_id FROM Users WHERE username = ?', [username], one=True)
if existing:
return jsonify({'success': False, 'message': 'Username already exists'})
# Create user
full_name = f"{first_name} {last_name}"
hashed_password = generate_password_hash(password)
try:
execute_db('''
INSERT INTO Users (username, password, full_name, role, branch, is_active)
VALUES (?, ?, ?, ?, ?, 1)
''', [username, hashed_password, full_name, role, branch])
return jsonify({'success': True, 'message': 'User created successfully'})
except Exception as e:
return jsonify({'success': False, 'message': f'Error creating user: {str(e)}'})
@app.route('/settings/users/<int:user_id>', methods=['GET'])
@role_required('owner', 'admin')
def get_user(user_id):
"""Get user details"""
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
if not user:
return jsonify({'success': False, 'message': 'User not found'})
# Admins can't view other admins or owners
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Split full name
name_parts = user['full_name'].split(' ', 1)
first_name = name_parts[0] if len(name_parts) > 0 else ''
last_name = name_parts[1] if len(name_parts) > 1 else ''
# Get email, handle None
email = user['email'] if user['email'] else ''
return jsonify({
'success': True,
'user': {
'user_id': user['user_id'],
'username': user['username'],
'first_name': first_name,
'last_name': last_name,
'email': email,
'role': user['role'],
'branch': user['branch'],
'is_active': user['is_active']
}
})
@app.route('/settings/users/<int:user_id>/update', methods=['POST'])
@role_required('owner', 'admin')
def update_user(user_id):
"""Update user details"""
data = request.get_json()
# Get existing user
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
if not user:
return jsonify({'success': False, 'message': 'User not found'})
# Admins can't edit other admins or owners
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Can't edit yourself to change your own role or deactivate
if user_id == session['user_id']:
if data.get('role') != user['role']:
return jsonify({'success': False, 'message': 'Cannot change your own role'})
if data.get('is_active') == 0:
return jsonify({'success': False, 'message': 'Cannot deactivate yourself'})
# Build update
username = data.get('username', '').strip()
first_name = data.get('first_name', '').strip()
last_name = data.get('last_name', '').strip()
email = data.get('email', '').strip()
role = data.get('role', user['role'])
branch = data.get('branch', user['branch'])
is_active = data.get('is_active', user['is_active'])
password = data.get('password', '').strip()
if not username or not first_name or not last_name:
return jsonify({'success': False, 'message': 'Username, first name, and last name are required'})
# Check if username is taken by another user
if username != user['username']:
existing = query_db('SELECT user_id FROM Users WHERE username = ? AND user_id != ?', [username, user_id], one=True)
if existing:
return jsonify({'success': False, 'message': 'Username already taken'})
# Admins can't change role to admin or owner
if session['role'] == 'admin' and role in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied: Cannot assign Admin or Owner role'})
full_name = f"{first_name} {last_name}"
try:
if password:
# Update with new password
hashed_password = generate_password_hash(password)
execute_db('''
UPDATE Users
SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?, password = ?
WHERE user_id = ?
''', [username, full_name, email, role, branch, is_active, hashed_password, user_id])
else:
# Update without changing password
execute_db('''
UPDATE Users
SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?
WHERE user_id = ?
''', [username, full_name, email, role, branch, is_active, user_id])
return jsonify({'success': True, 'message': 'User updated successfully'})
except Exception as e:
return jsonify({'success': False, 'message': f'Error updating user: {str(e)}'})
@app.route('/settings/users/<int:user_id>/delete', methods=['POST'])
@role_required('owner', 'admin')
def delete_user(user_id):
"""Delete (deactivate) a user"""
# Get user
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
if not user:
return jsonify({'success': False, 'message': 'User not found'})
# Admins can't delete other admins or owners
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Can't delete yourself
if user_id == session['user_id']:
return jsonify({'success': False, 'message': 'Cannot delete yourself'})
# Soft delete (deactivate)
try:
execute_db('UPDATE Users SET is_active = 0 WHERE user_id = ?', [user_id])
return jsonify({'success': True, 'message': 'User deleted successfully'})
except Exception as e:
return jsonify({'success': False, 'message': f'Error deleting user: {str(e)}'})
# ==================== PWA SUPPORT ROUTES ==================== # ==================== PWA SUPPORT ROUTES ====================
@app.route('/manifest.json') @app.route('/manifest.json')
@@ -1323,4 +1060,4 @@ def serve_sw():
# ==================== RUN APPLICATION ==================== # ==================== RUN APPLICATION ====================
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000) app.run(debug=True, host='0.0.0.0', port=5000)

Binary file not shown.

194
blueprints/users.py Normal file
View File

@@ -0,0 +1,194 @@
from flask import Blueprint, render_template, request, jsonify, session
from werkzeug.security import generate_password_hash
from db import query_db, execute_db
from utils import role_required
users_bp = Blueprint('users', __name__)
@users_bp.route('/settings/users')
@role_required('owner', 'admin')
def manage_users():
"""User management page"""
# Get all users
if session['role'] == 'owner':
# Owners can see everyone
users = query_db('SELECT * FROM Users ORDER BY role, full_name')
else:
# Admins can only see staff
users = query_db("SELECT * FROM Users WHERE role = 'staff' ORDER BY full_name")
return render_template('manage_users.html', users=users)
@users_bp.route('/settings/users/add', methods=['POST'])
@role_required('owner', 'admin')
def add_user():
"""Add a new user"""
data = request.get_json()
username = data.get('username', '').strip()
password = data.get('password', '')
first_name = data.get('first_name', '').strip()
last_name = data.get('last_name', '').strip()
email = data.get('email', '').strip()
role = data.get('role', 'staff')
branch = data.get('branch', 'Main')
# Validation
if not username or not password or not first_name or not last_name:
return jsonify({'success': False, 'message': 'Username, password, first name, and last name are required'})
# Admins can't create admins or owners
if session['role'] == 'admin' and role in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied: Admins can only create Staff users'})
# Check if username exists
existing = query_db('SELECT user_id FROM Users WHERE username = ?', [username], one=True)
if existing:
return jsonify({'success': False, 'message': 'Username already exists'})
# Create user
full_name = f"{first_name} {last_name}"
hashed_password = generate_password_hash(password)
try:
execute_db('''
INSERT INTO Users (username, password, full_name, role, branch, is_active)
VALUES (?, ?, ?, ?, ?, 1)
''', [username, hashed_password, full_name, role, branch])
return jsonify({'success': True, 'message': 'User created successfully'})
except Exception as e:
return jsonify({'success': False, 'message': f'Error creating user: {str(e)}'})
@users_bp.route('/settings/users/<int:user_id>', methods=['GET'])
@role_required('owner', 'admin')
def get_user(user_id):
"""Get user details"""
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
if not user:
return jsonify({'success': False, 'message': 'User not found'})
# Admins can't view other admins or owners
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Split full name
name_parts = user['full_name'].split(' ', 1)
first_name = name_parts[0] if len(name_parts) > 0 else ''
last_name = name_parts[1] if len(name_parts) > 1 else ''
# Get email, handle None
email = user['email'] if user['email'] else ''
return jsonify({
'success': True,
'user': {
'user_id': user['user_id'],
'username': user['username'],
'first_name': first_name,
'last_name': last_name,
'email': email,
'role': user['role'],
'branch': user['branch'],
'is_active': user['is_active']
}
})
@users_bp.route('/settings/users/<int:user_id>/update', methods=['POST'])
@role_required('owner', 'admin')
def update_user(user_id):
"""Update user details"""
data = request.get_json()
# Get existing user
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
if not user:
return jsonify({'success': False, 'message': 'User not found'})
# Admins can't edit other admins or owners
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Can't edit yourself to change your own role or deactivate
if user_id == session['user_id']:
if data.get('role') != user['role']:
return jsonify({'success': False, 'message': 'Cannot change your own role'})
if data.get('is_active') == 0:
return jsonify({'success': False, 'message': 'Cannot deactivate yourself'})
# Build update
username = data.get('username', '').strip()
first_name = data.get('first_name', '').strip()
last_name = data.get('last_name', '').strip()
email = data.get('email', '').strip()
role = data.get('role', user['role'])
branch = data.get('branch', user['branch'])
is_active = data.get('is_active', user['is_active'])
password = data.get('password', '').strip()
if not username or not first_name or not last_name:
return jsonify({'success': False, 'message': 'Username, first name, and last name are required'})
# Check if username is taken by another user
if username != user['username']:
existing = query_db('SELECT user_id FROM Users WHERE username = ? AND user_id != ?', [username, user_id], one=True)
if existing:
return jsonify({'success': False, 'message': 'Username already taken'})
# Admins can't change role to admin or owner
if session['role'] == 'admin' and role in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied: Cannot assign Admin or Owner role'})
full_name = f"{first_name} {last_name}"
try:
if password:
# Update with new password
hashed_password = generate_password_hash(password)
execute_db('''
UPDATE Users
SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?, password = ?
WHERE user_id = ?
''', [username, full_name, email, role, branch, is_active, hashed_password, user_id])
else:
# Update without changing password
execute_db('''
UPDATE Users
SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?
WHERE user_id = ?
''', [username, full_name, email, role, branch, is_active, user_id])
return jsonify({'success': True, 'message': 'User updated successfully'})
except Exception as e:
return jsonify({'success': False, 'message': f'Error updating user: {str(e)}'})
@users_bp.route('/settings/users/<int:user_id>/delete', methods=['POST'])
@role_required('owner', 'admin')
def delete_user(user_id):
"""Delete (deactivate) a user"""
# Get user
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
if not user:
return jsonify({'success': False, 'message': 'User not found'})
# Admins can't delete other admins or owners
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Can't delete yourself
if user_id == session['user_id']:
return jsonify({'success': False, 'message': 'Cannot delete yourself'})
# Soft delete (deactivate)
try:
execute_db('UPDATE Users SET is_active = 0 WHERE user_id = ?', [user_id])
return jsonify({'success': True, 'message': 'User deleted successfully'})
except Exception as e:
return jsonify({'success': False, 'message': f'Error deleting user: {str(e)}'})

View File

@@ -25,7 +25,7 @@
<div class="settings-dropdown"> <div class="settings-dropdown">
<button class="btn-settings" onclick="toggleSettings()">⚙️</button> <button class="btn-settings" onclick="toggleSettings()">⚙️</button>
<div id="settingsMenu" class="settings-menu"> <div id="settingsMenu" class="settings-menu">
<a href="{{ url_for('manage_users') }}" class="settings-item"> <a href="{{ url_for('users.manage_users') }}" class="settings-item">
<span class="settings-icon">👥</span> Manage Users <span class="settings-icon">👥</span> Manage Users
</a> </a>
</div> </div>

31
utils.py Normal file
View File

@@ -0,0 +1,31 @@
from functools import wraps
from flask import session, flash, redirect, url_for
from db import query_db
def login_required(f):
"""Require login for route"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Please log in to access this page', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def role_required(*roles):
"""Require specific role(s) for route"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Please log in to access this page', 'warning')
return redirect(url_for('login'))
user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True)
if not user or user['role'] not in roles:
flash('You do not have permission to access this page', 'danger')
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated_function
return decorator