diff --git a/__pycache__/utils.cpython-313.pyc b/__pycache__/utils.cpython-313.pyc new file mode 100644 index 0000000..2082dbf Binary files /dev/null and b/__pycache__/utils.cpython-313.pyc differ diff --git a/app.py b/app.py index c2c8bea..9f7e5d6 100644 --- a/app.py +++ b/app.py @@ -4,19 +4,21 @@ Flask Application Production-Ready Release """ from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, send_from_directory -from werkzeug.security import check_password_hash, generate_password_hash -from functools import wraps -import sqlite3 -import csv +from werkzeug.security import check_password_hash import os -from datetime import datetime, timedelta -from io import StringIO -from db import query_db, execute_db, get_db -from blueprints.data_imports import data_imports_bp +from datetime import timedelta +# Initialize App FIRST to prevent circular import errors app = Flask(__name__) +# Now import your custom modules +from db import query_db, execute_db, get_db +from blueprints.data_imports import data_imports_bp +from blueprints.users import users_bp +from utils import login_required, role_required + app.register_blueprint(data_imports_bp) +app.register_blueprint(users_bp) # V1.0: Use environment variable for production, fallback to demo key for development app.secret_key = os.environ.get('SCANLOOK_SECRET_KEY', 'scanlook-demo-key-replace-for-production') @@ -26,39 +28,6 @@ app.config['DATABASE'] = os.path.join(os.path.dirname(__file__), 'database', 'sc app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=1) - -# ==================== AUTHENTICATION DECORATORS ==================== - -def login_required(f): - """Require login for route""" - @wraps(f) - def decorated_function(*args, **kwargs): - if 'user_id' not in session: - flash('Please log in to access this page', 'warning') - return redirect(url_for('login')) - return f(*args, **kwargs) - return decorated_function - - -def role_required(*roles): - """Require specific role(s) for route""" - def decorator(f): - @wraps(f) - def decorated_function(*args, **kwargs): - if 'user_id' not in session: - flash('Please log in to access this page', 'warning') - return redirect(url_for('login')) - - user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True) - if not user or user['role'] not in roles: - flash('You do not have permission to access this page', 'danger') - return redirect(url_for('dashboard')) - - return f(*args, **kwargs) - return decorated_function - return decorator - - # ==================== ROUTES: AUTHENTICATION ==================== @app.route('/') @@ -361,8 +330,6 @@ def get_status_details(session_id, status): return jsonify({'success': False, 'message': f'Error: {str(e)}'}) - - # ==================== ROUTES: COUNTING (STAFF) ==================== @app.route('/count/') @@ -566,45 +533,6 @@ def get_location_scans(location_count_id): return jsonify({'success': False, 'message': str(e)}) - - """Start counting a location""" - location_code = request.form.get('location_code', '').strip().upper() - - if not location_code: - return jsonify({'success': False, 'message': 'Location code required'}) - - # Check if location already being counted - existing = query_db(''' - SELECT * FROM LocationCounts - WHERE session_id = ? AND location_name = ? AND status = 'in_progress' - ''', [session_id, location_code], one=True) - - if existing: - return jsonify({ - 'success': False, - 'message': f'Location {location_code} is already being counted by another user' - }) - - # Check expected lots from MASTER baseline - expected_lots = query_db(''' - SELECT COUNT(*) as count FROM BaselineInventory_Master - WHERE session_id = ? AND system_bin = ? - ''', [session_id, location_code], one=True)['count'] - - # Create location count record - location_count_id = execute_db(''' - INSERT INTO LocationCounts - (session_id, location_name, counted_by, status, expected_lots_master) - VALUES (?, ?, ?, 'in_progress', ?) - ''', [session_id, location_code, session['user_id'], expected_lots]) - - return jsonify({ - 'success': True, - 'location_count_id': location_count_id, - 'redirect': url_for('count_location', session_id=session_id, location_count_id=location_count_id) - }) - - @app.route('/count//location/') @login_required def count_location(session_id, location_count_id): @@ -1117,197 +1045,6 @@ def finish_location(session_id, location_count_id): }) -# ==================== ROUTES: USER MANAGEMENT ==================== - -@app.route('/settings/users') -@role_required('owner', 'admin') -def manage_users(): - """User management page""" - # Get all users - if session['role'] == 'owner': - # Owners can see everyone - users = query_db('SELECT * FROM Users ORDER BY role, full_name') - else: - # Admins can only see staff - users = query_db("SELECT * FROM Users WHERE role = 'staff' ORDER BY full_name") - - return render_template('manage_users.html', users=users) - - -@app.route('/settings/users/add', methods=['POST']) -@role_required('owner', 'admin') -def add_user(): - """Add a new user""" - data = request.get_json() - - username = data.get('username', '').strip() - password = data.get('password', '') - first_name = data.get('first_name', '').strip() - last_name = data.get('last_name', '').strip() - email = data.get('email', '').strip() - role = data.get('role', 'staff') - branch = data.get('branch', 'Main') - - # Validation - if not username or not password or not first_name or not last_name: - return jsonify({'success': False, 'message': 'Username, password, first name, and last name are required'}) - - # Admins can't create admins or owners - if session['role'] == 'admin' and role in ['admin', 'owner']: - return jsonify({'success': False, 'message': 'Permission denied: Admins can only create Staff users'}) - - # Check if username exists - existing = query_db('SELECT user_id FROM Users WHERE username = ?', [username], one=True) - if existing: - return jsonify({'success': False, 'message': 'Username already exists'}) - - # Create user - full_name = f"{first_name} {last_name}" - hashed_password = generate_password_hash(password) - - try: - execute_db(''' - INSERT INTO Users (username, password, full_name, role, branch, is_active) - VALUES (?, ?, ?, ?, ?, 1) - ''', [username, hashed_password, full_name, role, branch]) - - return jsonify({'success': True, 'message': 'User created successfully'}) - except Exception as e: - return jsonify({'success': False, 'message': f'Error creating user: {str(e)}'}) - - -@app.route('/settings/users/', methods=['GET']) -@role_required('owner', 'admin') -def get_user(user_id): - """Get user details""" - user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True) - - if not user: - return jsonify({'success': False, 'message': 'User not found'}) - - # Admins can't view other admins or owners - if session['role'] == 'admin' and user['role'] in ['admin', 'owner']: - return jsonify({'success': False, 'message': 'Permission denied'}) - - # Split full name - name_parts = user['full_name'].split(' ', 1) - first_name = name_parts[0] if len(name_parts) > 0 else '' - last_name = name_parts[1] if len(name_parts) > 1 else '' - - # Get email, handle None - email = user['email'] if user['email'] else '' - - return jsonify({ - 'success': True, - 'user': { - 'user_id': user['user_id'], - 'username': user['username'], - 'first_name': first_name, - 'last_name': last_name, - 'email': email, - 'role': user['role'], - 'branch': user['branch'], - 'is_active': user['is_active'] - } - }) - - -@app.route('/settings/users//update', methods=['POST']) -@role_required('owner', 'admin') -def update_user(user_id): - """Update user details""" - data = request.get_json() - - # Get existing user - user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True) - - if not user: - return jsonify({'success': False, 'message': 'User not found'}) - - # Admins can't edit other admins or owners - if session['role'] == 'admin' and user['role'] in ['admin', 'owner']: - return jsonify({'success': False, 'message': 'Permission denied'}) - - # Can't edit yourself to change your own role or deactivate - if user_id == session['user_id']: - if data.get('role') != user['role']: - return jsonify({'success': False, 'message': 'Cannot change your own role'}) - if data.get('is_active') == 0: - return jsonify({'success': False, 'message': 'Cannot deactivate yourself'}) - - # Build update - username = data.get('username', '').strip() - first_name = data.get('first_name', '').strip() - last_name = data.get('last_name', '').strip() - email = data.get('email', '').strip() - role = data.get('role', user['role']) - branch = data.get('branch', user['branch']) - is_active = data.get('is_active', user['is_active']) - password = data.get('password', '').strip() - - if not username or not first_name or not last_name: - return jsonify({'success': False, 'message': 'Username, first name, and last name are required'}) - - # Check if username is taken by another user - if username != user['username']: - existing = query_db('SELECT user_id FROM Users WHERE username = ? AND user_id != ?', [username, user_id], one=True) - if existing: - return jsonify({'success': False, 'message': 'Username already taken'}) - - # Admins can't change role to admin or owner - if session['role'] == 'admin' and role in ['admin', 'owner']: - return jsonify({'success': False, 'message': 'Permission denied: Cannot assign Admin or Owner role'}) - - full_name = f"{first_name} {last_name}" - - try: - if password: - # Update with new password - hashed_password = generate_password_hash(password) - execute_db(''' - UPDATE Users - SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?, password = ? - WHERE user_id = ? - ''', [username, full_name, email, role, branch, is_active, hashed_password, user_id]) - else: - # Update without changing password - execute_db(''' - UPDATE Users - SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ? - WHERE user_id = ? - ''', [username, full_name, email, role, branch, is_active, user_id]) - - return jsonify({'success': True, 'message': 'User updated successfully'}) - except Exception as e: - return jsonify({'success': False, 'message': f'Error updating user: {str(e)}'}) - - -@app.route('/settings/users//delete', methods=['POST']) -@role_required('owner', 'admin') -def delete_user(user_id): - """Delete (deactivate) a user""" - # Get user - user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True) - - if not user: - return jsonify({'success': False, 'message': 'User not found'}) - - # Admins can't delete other admins or owners - if session['role'] == 'admin' and user['role'] in ['admin', 'owner']: - return jsonify({'success': False, 'message': 'Permission denied'}) - - # Can't delete yourself - if user_id == session['user_id']: - return jsonify({'success': False, 'message': 'Cannot delete yourself'}) - - # Soft delete (deactivate) - try: - execute_db('UPDATE Users SET is_active = 0 WHERE user_id = ?', [user_id]) - return jsonify({'success': True, 'message': 'User deleted successfully'}) - except Exception as e: - return jsonify({'success': False, 'message': f'Error deleting user: {str(e)}'}) - - # ==================== PWA SUPPORT ROUTES ==================== @app.route('/manifest.json') @@ -1323,4 +1060,4 @@ def serve_sw(): # ==================== RUN APPLICATION ==================== if __name__ == '__main__': - app.run(debug=True, host='0.0.0.0', port=5000) + app.run(debug=True, host='0.0.0.0', port=5000) \ No newline at end of file diff --git a/blueprints/__pycache__/users.cpython-313.pyc b/blueprints/__pycache__/users.cpython-313.pyc new file mode 100644 index 0000000..0de51c9 Binary files /dev/null and b/blueprints/__pycache__/users.cpython-313.pyc differ diff --git a/blueprints/users.py b/blueprints/users.py new file mode 100644 index 0000000..47c1acd --- /dev/null +++ b/blueprints/users.py @@ -0,0 +1,194 @@ +from flask import Blueprint, render_template, request, jsonify, session +from werkzeug.security import generate_password_hash +from db import query_db, execute_db +from utils import role_required + +users_bp = Blueprint('users', __name__) + +@users_bp.route('/settings/users') +@role_required('owner', 'admin') +def manage_users(): + """User management page""" + # Get all users + if session['role'] == 'owner': + # Owners can see everyone + users = query_db('SELECT * FROM Users ORDER BY role, full_name') + else: + # Admins can only see staff + users = query_db("SELECT * FROM Users WHERE role = 'staff' ORDER BY full_name") + + return render_template('manage_users.html', users=users) + + +@users_bp.route('/settings/users/add', methods=['POST']) +@role_required('owner', 'admin') +def add_user(): + """Add a new user""" + data = request.get_json() + + username = data.get('username', '').strip() + password = data.get('password', '') + first_name = data.get('first_name', '').strip() + last_name = data.get('last_name', '').strip() + email = data.get('email', '').strip() + role = data.get('role', 'staff') + branch = data.get('branch', 'Main') + + # Validation + if not username or not password or not first_name or not last_name: + return jsonify({'success': False, 'message': 'Username, password, first name, and last name are required'}) + + # Admins can't create admins or owners + if session['role'] == 'admin' and role in ['admin', 'owner']: + return jsonify({'success': False, 'message': 'Permission denied: Admins can only create Staff users'}) + + # Check if username exists + existing = query_db('SELECT user_id FROM Users WHERE username = ?', [username], one=True) + if existing: + return jsonify({'success': False, 'message': 'Username already exists'}) + + # Create user + full_name = f"{first_name} {last_name}" + hashed_password = generate_password_hash(password) + + try: + execute_db(''' + INSERT INTO Users (username, password, full_name, role, branch, is_active) + VALUES (?, ?, ?, ?, ?, 1) + ''', [username, hashed_password, full_name, role, branch]) + + return jsonify({'success': True, 'message': 'User created successfully'}) + except Exception as e: + return jsonify({'success': False, 'message': f'Error creating user: {str(e)}'}) + + +@users_bp.route('/settings/users/', methods=['GET']) +@role_required('owner', 'admin') +def get_user(user_id): + """Get user details""" + user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True) + + if not user: + return jsonify({'success': False, 'message': 'User not found'}) + + # Admins can't view other admins or owners + if session['role'] == 'admin' and user['role'] in ['admin', 'owner']: + return jsonify({'success': False, 'message': 'Permission denied'}) + + # Split full name + name_parts = user['full_name'].split(' ', 1) + first_name = name_parts[0] if len(name_parts) > 0 else '' + last_name = name_parts[1] if len(name_parts) > 1 else '' + + # Get email, handle None + email = user['email'] if user['email'] else '' + + return jsonify({ + 'success': True, + 'user': { + 'user_id': user['user_id'], + 'username': user['username'], + 'first_name': first_name, + 'last_name': last_name, + 'email': email, + 'role': user['role'], + 'branch': user['branch'], + 'is_active': user['is_active'] + } + }) + + +@users_bp.route('/settings/users//update', methods=['POST']) +@role_required('owner', 'admin') +def update_user(user_id): + """Update user details""" + data = request.get_json() + + # Get existing user + user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True) + + if not user: + return jsonify({'success': False, 'message': 'User not found'}) + + # Admins can't edit other admins or owners + if session['role'] == 'admin' and user['role'] in ['admin', 'owner']: + return jsonify({'success': False, 'message': 'Permission denied'}) + + # Can't edit yourself to change your own role or deactivate + if user_id == session['user_id']: + if data.get('role') != user['role']: + return jsonify({'success': False, 'message': 'Cannot change your own role'}) + if data.get('is_active') == 0: + return jsonify({'success': False, 'message': 'Cannot deactivate yourself'}) + + # Build update + username = data.get('username', '').strip() + first_name = data.get('first_name', '').strip() + last_name = data.get('last_name', '').strip() + email = data.get('email', '').strip() + role = data.get('role', user['role']) + branch = data.get('branch', user['branch']) + is_active = data.get('is_active', user['is_active']) + password = data.get('password', '').strip() + + if not username or not first_name or not last_name: + return jsonify({'success': False, 'message': 'Username, first name, and last name are required'}) + + # Check if username is taken by another user + if username != user['username']: + existing = query_db('SELECT user_id FROM Users WHERE username = ? AND user_id != ?', [username, user_id], one=True) + if existing: + return jsonify({'success': False, 'message': 'Username already taken'}) + + # Admins can't change role to admin or owner + if session['role'] == 'admin' and role in ['admin', 'owner']: + return jsonify({'success': False, 'message': 'Permission denied: Cannot assign Admin or Owner role'}) + + full_name = f"{first_name} {last_name}" + + try: + if password: + # Update with new password + hashed_password = generate_password_hash(password) + execute_db(''' + UPDATE Users + SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?, password = ? + WHERE user_id = ? + ''', [username, full_name, email, role, branch, is_active, hashed_password, user_id]) + else: + # Update without changing password + execute_db(''' + UPDATE Users + SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ? + WHERE user_id = ? + ''', [username, full_name, email, role, branch, is_active, user_id]) + + return jsonify({'success': True, 'message': 'User updated successfully'}) + except Exception as e: + return jsonify({'success': False, 'message': f'Error updating user: {str(e)}'}) + + +@users_bp.route('/settings/users//delete', methods=['POST']) +@role_required('owner', 'admin') +def delete_user(user_id): + """Delete (deactivate) a user""" + # Get user + user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True) + + if not user: + return jsonify({'success': False, 'message': 'User not found'}) + + # Admins can't delete other admins or owners + if session['role'] == 'admin' and user['role'] in ['admin', 'owner']: + return jsonify({'success': False, 'message': 'Permission denied'}) + + # Can't delete yourself + if user_id == session['user_id']: + return jsonify({'success': False, 'message': 'Cannot delete yourself'}) + + # Soft delete (deactivate) + try: + execute_db('UPDATE Users SET is_active = 0 WHERE user_id = ?', [user_id]) + return jsonify({'success': True, 'message': 'User deleted successfully'}) + except Exception as e: + return jsonify({'success': False, 'message': f'Error deleting user: {str(e)}'}) \ No newline at end of file diff --git a/templates/base.html b/templates/base.html index 36af395..975de22 100644 --- a/templates/base.html +++ b/templates/base.html @@ -25,7 +25,7 @@
diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..6f7138b --- /dev/null +++ b/utils.py @@ -0,0 +1,31 @@ +from functools import wraps +from flask import session, flash, redirect, url_for +from db import query_db + +def login_required(f): + """Require login for route""" + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + flash('Please log in to access this page', 'warning') + return redirect(url_for('login')) + return f(*args, **kwargs) + return decorated_function + +def role_required(*roles): + """Require specific role(s) for route""" + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + flash('Please log in to access this page', 'warning') + return redirect(url_for('login')) + + user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True) + if not user or user['role'] not in roles: + flash('You do not have permission to access this page', 'danger') + return redirect(url_for('dashboard')) + + return f(*args, **kwargs) + return decorated_function + return decorator \ No newline at end of file