1327 lines
51 KiB
Python
1327 lines
51 KiB
Python
"""
|
|
ScanLook V1.0 - Inventory Management System
|
|
Flask Application
|
|
Production-Ready Release
|
|
"""
|
|
from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, send_from_directory
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
from functools import wraps
|
|
import sqlite3
|
|
import csv
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from io import StringIO
|
|
from db import query_db, execute_db, get_db
|
|
from blueprints.data_imports import data_imports_bp
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.register_blueprint(data_imports_bp)
|
|
|
|
# V1.0: Use environment variable for production, fallback to demo key for development
|
|
app.secret_key = os.environ.get('SCANLOOK_SECRET_KEY', 'scanlook-demo-key-replace-for-production')
|
|
app.config['DATABASE'] = os.path.join(os.path.dirname(__file__), 'database', 'scanlook.db')
|
|
|
|
# Session timeout: 1 hour (auto-logout after idle period)
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=1)
|
|
|
|
|
|
|
|
# ==================== AUTHENTICATION DECORATORS ====================
|
|
|
|
def login_required(f):
|
|
"""Require login for route"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
flash('Please log in to access this page', 'warning')
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
def role_required(*roles):
|
|
"""Require specific role(s) for route"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
flash('Please log in to access this page', 'warning')
|
|
return redirect(url_for('login'))
|
|
|
|
user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True)
|
|
if not user or user['role'] not in roles:
|
|
flash('You do not have permission to access this page', 'danger')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
|
|
# ==================== ROUTES: AUTHENTICATION ====================
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Landing page - redirect based on login status"""
|
|
if 'user_id' in session:
|
|
return redirect(url_for('dashboard'))
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
"""Login page"""
|
|
if request.method == 'POST':
|
|
username = request.form.get('username', '').strip()
|
|
password = request.form.get('password', '')
|
|
|
|
user = query_db('SELECT * FROM Users WHERE username = ? AND is_active = 1', [username], one=True)
|
|
|
|
if user and check_password_hash(user['password'], password):
|
|
session.permanent = True # Enable session timeout
|
|
session['user_id'] = user['user_id']
|
|
session['username'] = user['username']
|
|
session['full_name'] = user['full_name']
|
|
session['role'] = user['role']
|
|
flash(f'Welcome back, {user["full_name"]}!', 'success')
|
|
return redirect(url_for('dashboard'))
|
|
else:
|
|
flash('Invalid username or password', 'danger')
|
|
|
|
return render_template('login.html')
|
|
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
"""Logout"""
|
|
session.clear()
|
|
flash('You have been logged out', 'info')
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
# ==================== ROUTES: DASHBOARD ====================
|
|
|
|
@app.route('/dashboard')
|
|
@login_required
|
|
def dashboard():
|
|
"""Main dashboard - different views for admin vs staff"""
|
|
role = session.get('role')
|
|
|
|
if role in ['owner', 'admin']:
|
|
# Admin dashboard
|
|
active_sessions = query_db('''
|
|
SELECT s.*, u.full_name as created_by_name,
|
|
COUNT(DISTINCT lc.location_count_id) as total_locations,
|
|
SUM(CASE WHEN lc.status = 'completed' THEN 1 ELSE 0 END) as completed_locations,
|
|
SUM(CASE WHEN lc.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_locations
|
|
FROM CountSessions s
|
|
LEFT JOIN Users u ON s.created_by = u.user_id
|
|
LEFT JOIN LocationCounts lc ON s.session_id = lc.session_id
|
|
WHERE s.status = 'active'
|
|
GROUP BY s.session_id
|
|
ORDER BY s.created_timestamp DESC
|
|
''')
|
|
|
|
return render_template('admin_dashboard.html', sessions=active_sessions)
|
|
|
|
else:
|
|
# Staff dashboard
|
|
active_sessions = query_db('''
|
|
SELECT session_id, session_name, session_type, created_timestamp
|
|
FROM CountSessions
|
|
WHERE status = 'active'
|
|
ORDER BY created_timestamp DESC
|
|
''')
|
|
|
|
return render_template('staff_dashboard.html', sessions=active_sessions)
|
|
|
|
|
|
@app.route('/staff-mode')
|
|
@login_required
|
|
def staff_mode():
|
|
"""Allow admin/owner to switch to staff view for scanning"""
|
|
# Show staff dashboard view regardless of role
|
|
active_sessions = query_db('''
|
|
SELECT session_id, session_name, session_type, created_timestamp
|
|
FROM CountSessions
|
|
WHERE status = 'active'
|
|
ORDER BY created_timestamp DESC
|
|
''')
|
|
|
|
return render_template('staff_dashboard.html', sessions=active_sessions, is_admin_mode=True)
|
|
|
|
|
|
# ==================== ROUTES: SESSION MANAGEMENT (ADMIN) ====================
|
|
|
|
@app.route('/session/create', methods=['GET', 'POST'])
|
|
@role_required('owner', 'admin')
|
|
def create_session():
|
|
"""Create new count session"""
|
|
if request.method == 'POST':
|
|
session_name = request.form.get('session_name', '').strip()
|
|
session_type = request.form.get('session_type')
|
|
|
|
if not session_name:
|
|
flash('Session name is required', 'danger')
|
|
return redirect(url_for('create_session'))
|
|
|
|
session_id = execute_db('''
|
|
INSERT INTO CountSessions (session_name, session_type, created_by, branch)
|
|
VALUES (?, ?, ?, ?)
|
|
''', [session_name, session_type, session['user_id'], 'Main'])
|
|
|
|
flash(f'Session "{session_name}" created successfully!', 'success')
|
|
return redirect(url_for('session_detail', session_id=session_id))
|
|
|
|
return render_template('create_session.html')
|
|
|
|
|
|
@app.route('/session/<int:session_id>')
|
|
@role_required('owner', 'admin')
|
|
def session_detail(session_id):
|
|
"""Session detail and monitoring page"""
|
|
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
|
|
|
|
if not sess:
|
|
flash('Session not found', 'danger')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get statistics
|
|
stats = query_db('''
|
|
SELECT
|
|
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.is_deleted = 0) as total_scans,
|
|
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) < 0.01) as matched,
|
|
COUNT(DISTINCT se.lot_number) FILTER (WHERE se.duplicate_status IN ('01', '03', '04') AND se.is_deleted = 0) as duplicates,
|
|
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01) as weight_discrepancy,
|
|
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'wrong_location' AND se.is_deleted = 0) as wrong_location,
|
|
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'ghost_lot' AND se.is_deleted = 0) as ghost_lots,
|
|
COUNT(DISTINCT ml.missing_id) as missing_lots
|
|
FROM CountSessions cs
|
|
LEFT JOIN ScanEntries se ON cs.session_id = se.session_id
|
|
LEFT JOIN MissingLots ml ON cs.session_id = ml.session_id
|
|
WHERE cs.session_id = ?
|
|
''', [session_id], one=True)
|
|
|
|
# Get location progress
|
|
locations = query_db('''
|
|
SELECT lc.*, u.full_name as counter_name
|
|
FROM LocationCounts lc
|
|
LEFT JOIN Users u ON lc.counted_by = u.user_id
|
|
WHERE lc.session_id = ?
|
|
ORDER BY lc.status DESC, lc.location_name
|
|
''', [session_id])
|
|
|
|
# Get active counters
|
|
active_counters = query_db('''
|
|
SELECT DISTINCT u.full_name, lc.location_name, lc.start_timestamp
|
|
FROM LocationCounts lc
|
|
JOIN Users u ON lc.counted_by = u.user_id
|
|
WHERE lc.session_id = ? AND lc.status = 'in_progress'
|
|
ORDER BY lc.start_timestamp DESC
|
|
''', [session_id])
|
|
|
|
return render_template('session_detail.html',
|
|
count_session=sess,
|
|
stats=stats,
|
|
locations=locations,
|
|
active_counters=active_counters)
|
|
|
|
|
|
@app.route('/session/<int:session_id>/status-details/<status>')
|
|
@role_required('owner', 'admin')
|
|
def get_status_details(session_id, status):
|
|
"""Get detailed breakdown for a specific status"""
|
|
|
|
try:
|
|
if status == 'match':
|
|
# Matched lots (not duplicates) - JOIN with CURRENT for live data
|
|
items = query_db('''
|
|
SELECT
|
|
se.*,
|
|
u.full_name as scanned_by_name,
|
|
bic.system_bin as current_system_location,
|
|
bic.system_quantity as current_system_weight
|
|
FROM ScanEntries se
|
|
JOIN Users u ON se.scanned_by = u.user_id
|
|
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
|
|
WHERE se.session_id = ?
|
|
AND se.master_status = 'match'
|
|
AND se.duplicate_status = '00'
|
|
AND se.is_deleted = 0
|
|
ORDER BY se.scan_timestamp DESC
|
|
''', [session_id])
|
|
|
|
elif status == 'duplicates':
|
|
# Duplicate lots (grouped by lot number) - JOIN with CURRENT
|
|
items = query_db('''
|
|
SELECT
|
|
se.lot_number,
|
|
se.item,
|
|
se.description,
|
|
GROUP_CONCAT(DISTINCT se.scanned_location) as scanned_location,
|
|
SUM(se.actual_weight) as actual_weight,
|
|
se.master_expected_location,
|
|
se.master_expected_weight,
|
|
GROUP_CONCAT(DISTINCT u.full_name) as scanned_by_name,
|
|
MIN(se.scan_timestamp) as scan_timestamp,
|
|
bic.system_bin as current_system_location,
|
|
bic.system_quantity as current_system_weight
|
|
FROM ScanEntries se
|
|
JOIN Users u ON se.scanned_by = u.user_id
|
|
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
|
|
WHERE se.session_id = ?
|
|
AND se.duplicate_status IN ('01', '03', '04')
|
|
AND se.is_deleted = 0
|
|
GROUP BY se.lot_number
|
|
ORDER BY se.lot_number
|
|
''', [session_id])
|
|
|
|
elif status == 'wrong_location':
|
|
# Wrong location lots - JOIN with CURRENT
|
|
items = query_db('''
|
|
SELECT
|
|
se.*,
|
|
u.full_name as scanned_by_name,
|
|
bic.system_bin as current_system_location,
|
|
bic.system_quantity as current_system_weight
|
|
FROM ScanEntries se
|
|
JOIN Users u ON se.scanned_by = u.user_id
|
|
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
|
|
WHERE se.session_id = ?
|
|
AND se.master_status = 'wrong_location'
|
|
AND se.is_deleted = 0
|
|
ORDER BY se.scan_timestamp DESC
|
|
''', [session_id])
|
|
|
|
elif status == 'weight_discrepancy':
|
|
# Weight discrepancies (right location, wrong weight) - JOIN with CURRENT
|
|
items = query_db('''
|
|
SELECT
|
|
se.*,
|
|
u.full_name as scanned_by_name,
|
|
bic.system_bin as current_system_location,
|
|
bic.system_quantity as current_system_weight
|
|
FROM ScanEntries se
|
|
JOIN Users u ON se.scanned_by = u.user_id
|
|
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
|
|
WHERE se.session_id = ?
|
|
AND se.master_status = 'match'
|
|
AND se.duplicate_status = '00'
|
|
AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01
|
|
AND se.is_deleted = 0
|
|
ORDER BY ABS(se.actual_weight - se.master_expected_weight) DESC
|
|
''', [session_id])
|
|
|
|
elif status == 'ghost_lot':
|
|
# Ghost lots (not in master baseline) - JOIN with CURRENT
|
|
items = query_db('''
|
|
SELECT
|
|
se.*,
|
|
u.full_name as scanned_by_name,
|
|
bic.system_bin as current_system_location,
|
|
bic.system_quantity as current_system_weight
|
|
FROM ScanEntries se
|
|
JOIN Users u ON se.scanned_by = u.user_id
|
|
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
|
|
WHERE se.session_id = ?
|
|
AND se.master_status = 'ghost_lot'
|
|
AND se.is_deleted = 0
|
|
ORDER BY se.scan_timestamp DESC
|
|
''', [session_id])
|
|
|
|
elif status == 'missing':
|
|
# Missing lots (in master but not scanned)
|
|
items = query_db('''
|
|
SELECT
|
|
bim.lot_number,
|
|
bim.item,
|
|
bim.description,
|
|
bim.system_bin,
|
|
bim.system_quantity
|
|
FROM BaselineInventory_Master bim
|
|
WHERE bim.session_id = ?
|
|
AND bim.lot_number NOT IN (
|
|
SELECT lot_number
|
|
FROM ScanEntries
|
|
WHERE session_id = ? AND is_deleted = 0
|
|
)
|
|
ORDER BY bim.system_bin, bim.lot_number
|
|
''', [session_id, session_id])
|
|
else:
|
|
return jsonify({'success': False, 'message': 'Invalid status'})
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'items': [dict(item) for item in items] if items else []
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Error in get_status_details: {str(e)}")
|
|
return jsonify({'success': False, 'message': f'Error: {str(e)}'})
|
|
|
|
|
|
|
|
|
|
# ==================== ROUTES: COUNTING (STAFF) ====================
|
|
|
|
@app.route('/count/<int:session_id>')
|
|
@login_required
|
|
def count_session(session_id):
|
|
"""Select session and begin counting"""
|
|
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ? AND status = "active"',
|
|
[session_id], one=True)
|
|
|
|
if not sess:
|
|
flash('Session not found or not active', 'danger')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Redirect to my_counts page (staff can manage multiple bins)
|
|
return redirect(url_for('my_counts', session_id=session_id))
|
|
|
|
|
|
@app.route('/session/<int:session_id>/my-counts')
|
|
@login_required
|
|
def my_counts(session_id):
|
|
"""Staff view of their active and completed bins"""
|
|
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
|
|
|
|
if not sess:
|
|
flash('Session not found', 'danger')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get this user's active bins
|
|
active_bins = query_db('''
|
|
SELECT lc.*,
|
|
COUNT(se.entry_id) as scan_count
|
|
FROM LocationCounts lc
|
|
LEFT JOIN ScanEntries se ON lc.location_count_id = se.location_count_id AND se.is_deleted = 0
|
|
WHERE lc.session_id = ?
|
|
AND lc.counted_by = ?
|
|
AND lc.status = 'in_progress'
|
|
GROUP BY lc.location_count_id
|
|
ORDER BY lc.start_timestamp DESC
|
|
''', [session_id, session['user_id']])
|
|
|
|
# Get this user's completed bins
|
|
completed_bins = query_db('''
|
|
SELECT lc.*,
|
|
COUNT(se.entry_id) as scan_count
|
|
FROM LocationCounts lc
|
|
LEFT JOIN ScanEntries se ON lc.location_count_id = se.location_count_id AND se.is_deleted = 0
|
|
WHERE lc.session_id = ?
|
|
AND lc.counted_by = ?
|
|
AND lc.status = 'completed'
|
|
GROUP BY lc.location_count_id
|
|
ORDER BY lc.end_timestamp DESC
|
|
''', [session_id, session['user_id']])
|
|
|
|
return render_template('my_counts.html',
|
|
count_session=sess,
|
|
active_bins=active_bins,
|
|
completed_bins=completed_bins)
|
|
|
|
|
|
@app.route('/session/<int:session_id>/start-bin', methods=['POST'])
|
|
@login_required
|
|
def start_bin_count(session_id):
|
|
"""Start counting a new bin"""
|
|
location_name = request.form.get('location_name', '').strip().upper()
|
|
|
|
if not location_name:
|
|
flash('Bin number is required', 'danger')
|
|
return redirect(url_for('my_counts', session_id=session_id))
|
|
|
|
# Count expected lots from MASTER baseline for this location
|
|
expected_lots = query_db('''
|
|
SELECT COUNT(DISTINCT lot_number) as count
|
|
FROM BaselineInventory_Master
|
|
WHERE session_id = ? AND system_bin = ?
|
|
''', [session_id, location_name], one=True)
|
|
|
|
expected_count = expected_lots['count'] if expected_lots else 0
|
|
|
|
# Create new location count
|
|
conn = get_db()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
INSERT INTO LocationCounts (session_id, location_name, counted_by, status, start_timestamp, expected_lots_master)
|
|
VALUES (?, ?, ?, 'in_progress', CURRENT_TIMESTAMP, ?)
|
|
''', [session_id, location_name, session['user_id'], expected_count])
|
|
|
|
location_count_id = cursor.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash(f'Started counting bin: {location_name}', 'success')
|
|
return redirect(url_for('count_location', session_id=session_id, location_count_id=location_count_id))
|
|
|
|
|
|
@app.route('/location/<int:location_count_id>/complete', methods=['POST'])
|
|
@login_required
|
|
def complete_location(location_count_id):
|
|
"""Mark a location count as complete"""
|
|
# Verify ownership
|
|
loc = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True)
|
|
|
|
if not loc:
|
|
return jsonify({'success': False, 'message': 'Location not found'})
|
|
|
|
if loc['counted_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
# Mark as completed
|
|
execute_db('''
|
|
UPDATE LocationCounts
|
|
SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP
|
|
WHERE location_count_id = ?
|
|
''', [location_count_id])
|
|
|
|
return jsonify({'success': True, 'message': 'Bin marked as complete'})
|
|
|
|
|
|
@app.route('/location/<int:location_count_id>/reopen', methods=['POST'])
|
|
@login_required
|
|
def reopen_location(location_count_id):
|
|
"""Reopen a completed location (admin/owner only)"""
|
|
# Check permissions
|
|
user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True)
|
|
if not user or user['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'}), 403
|
|
|
|
# Verify location exists
|
|
loc = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True)
|
|
|
|
if not loc:
|
|
return jsonify({'success': False, 'message': 'Location not found'})
|
|
|
|
# Reopen the location
|
|
execute_db('''
|
|
UPDATE LocationCounts
|
|
SET status = 'in_progress', end_timestamp = NULL
|
|
WHERE location_count_id = ?
|
|
''', [location_count_id])
|
|
|
|
return jsonify({'success': True, 'message': 'Bin reopened for counting'})
|
|
|
|
|
|
@app.route('/location/<int:location_count_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_location_count(location_count_id):
|
|
"""Delete all counts for a location (soft delete)"""
|
|
# Verify ownership
|
|
loc = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True)
|
|
|
|
if not loc:
|
|
return jsonify({'success': False, 'message': 'Location not found'})
|
|
|
|
if loc['counted_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
# Soft delete all scan entries for this location
|
|
execute_db('''
|
|
UPDATE ScanEntries
|
|
SET is_deleted = 1
|
|
WHERE location_count_id = ?
|
|
''', [location_count_id])
|
|
|
|
# Delete the location count record
|
|
execute_db('''
|
|
DELETE FROM LocationCounts
|
|
WHERE location_count_id = ?
|
|
''', [location_count_id])
|
|
|
|
return jsonify({'success': True, 'message': 'Bin count deleted'})
|
|
|
|
|
|
@app.route('/location/<int:location_count_id>/scans')
|
|
@login_required
|
|
def get_location_scans(location_count_id):
|
|
"""Get all scans for a specific location (admin/owner only)"""
|
|
# Check permissions
|
|
user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True)
|
|
if not user or user['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'}), 403
|
|
|
|
try:
|
|
scans = query_db('''
|
|
SELECT
|
|
se.*,
|
|
bic.system_bin as current_system_location,
|
|
bic.system_quantity as current_system_weight
|
|
FROM ScanEntries se
|
|
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
|
|
WHERE se.location_count_id = ?
|
|
AND se.is_deleted = 0
|
|
ORDER BY se.scan_timestamp DESC
|
|
''', [location_count_id])
|
|
|
|
# Convert Row objects to dicts
|
|
scans_list = [dict(scan) for scan in scans] if scans else []
|
|
|
|
return jsonify({'success': True, 'scans': scans_list})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)})
|
|
|
|
|
|
|
|
"""Start counting a location"""
|
|
location_code = request.form.get('location_code', '').strip().upper()
|
|
|
|
if not location_code:
|
|
return jsonify({'success': False, 'message': 'Location code required'})
|
|
|
|
# Check if location already being counted
|
|
existing = query_db('''
|
|
SELECT * FROM LocationCounts
|
|
WHERE session_id = ? AND location_name = ? AND status = 'in_progress'
|
|
''', [session_id, location_code], one=True)
|
|
|
|
if existing:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': f'Location {location_code} is already being counted by another user'
|
|
})
|
|
|
|
# Check expected lots from MASTER baseline
|
|
expected_lots = query_db('''
|
|
SELECT COUNT(*) as count FROM BaselineInventory_Master
|
|
WHERE session_id = ? AND system_bin = ?
|
|
''', [session_id, location_code], one=True)['count']
|
|
|
|
# Create location count record
|
|
location_count_id = execute_db('''
|
|
INSERT INTO LocationCounts
|
|
(session_id, location_name, counted_by, status, expected_lots_master)
|
|
VALUES (?, ?, ?, 'in_progress', ?)
|
|
''', [session_id, location_code, session['user_id'], expected_lots])
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'location_count_id': location_count_id,
|
|
'redirect': url_for('count_location', session_id=session_id, location_count_id=location_count_id)
|
|
})
|
|
|
|
|
|
@app.route('/count/<int:session_id>/location/<int:location_count_id>')
|
|
@login_required
|
|
def count_location(session_id, location_count_id):
|
|
"""Count lots in a specific location"""
|
|
# Get session info to determine type (Cycle Count vs Physical)
|
|
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
|
|
|
|
location = query_db('''
|
|
SELECT * FROM LocationCounts
|
|
WHERE location_count_id = ? AND session_id = ?
|
|
''', [location_count_id, session_id], one=True)
|
|
|
|
if not location:
|
|
flash('Location not found', 'danger')
|
|
return redirect(url_for('count_session', session_id=session_id))
|
|
|
|
# Check if location is completed and user is staff (not admin/owner)
|
|
if location['status'] == 'completed' and session['role'] == 'staff':
|
|
flash(f'Location {location["location_name"]} has been finalized and cannot accept new scans', 'warning')
|
|
return redirect(url_for('my_counts', session_id=session_id))
|
|
|
|
# Get scans for this location (Scanned Lots)
|
|
scans = query_db('''
|
|
SELECT * FROM ScanEntries
|
|
WHERE location_count_id = ? AND is_deleted = 0
|
|
ORDER BY scan_timestamp DESC
|
|
''', [location_count_id])
|
|
|
|
# NEW LOGIC: Get Expected Lots for Cycle Counts (Grouped & Summed)
|
|
expected_lots = []
|
|
if sess and sess['session_type'] == 'cycle_count':
|
|
expected_lots = query_db('''
|
|
SELECT
|
|
lot_number,
|
|
MAX(item) as item, -- Pick one item code if they differ (rare)
|
|
SUM(system_quantity) as total_weight
|
|
FROM BaselineInventory_Master
|
|
WHERE session_id = ?
|
|
AND system_bin = ?
|
|
AND lot_number NOT IN (
|
|
SELECT lot_number
|
|
FROM ScanEntries
|
|
WHERE location_count_id = ?
|
|
AND is_deleted = 0
|
|
)
|
|
GROUP BY lot_number
|
|
ORDER BY lot_number
|
|
''', [session_id, location['location_name'], location_count_id])
|
|
|
|
return render_template('count_location.html',
|
|
session_id=session_id,
|
|
location=location,
|
|
scans=scans,
|
|
expected_lots=expected_lots,
|
|
session_type=sess['session_type'] if sess else '')
|
|
|
|
@app.route('/count/<int:session_id>/location/<int:location_count_id>/scan', methods=['POST'])
|
|
@login_required
|
|
def scan_lot(session_id, location_count_id):
|
|
"""Process a lot scan with duplicate detection"""
|
|
data = request.get_json()
|
|
lot_number = data.get('lot_number', '').strip()
|
|
weight = data.get('weight')
|
|
confirm_duplicate = data.get('confirm_duplicate', False)
|
|
check_only = data.get('check_only', False) # Just checking for duplicates, not saving
|
|
|
|
if not lot_number:
|
|
return jsonify({'success': False, 'message': 'Lot number required'})
|
|
|
|
if not check_only and not weight:
|
|
return jsonify({'success': False, 'message': 'Weight required'})
|
|
|
|
if not check_only:
|
|
try:
|
|
weight = float(weight)
|
|
except ValueError:
|
|
return jsonify({'success': False, 'message': 'Invalid weight value'})
|
|
|
|
# Get location info
|
|
location = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?',
|
|
[location_count_id], one=True)
|
|
|
|
# Check for duplicates in this session
|
|
existing_scans = query_db('''
|
|
SELECT se.*, lc.location_name, u.full_name
|
|
FROM ScanEntries se
|
|
JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id
|
|
JOIN Users u ON se.scanned_by = u.user_id
|
|
WHERE se.session_id = ? AND se.lot_number = ? AND se.is_deleted = 0
|
|
''', [session_id, lot_number])
|
|
|
|
duplicate_status = '00' # Default: no duplicate
|
|
duplicate_info = None
|
|
needs_confirmation = False
|
|
|
|
if existing_scans:
|
|
# Check for same location duplicates (by this user)
|
|
same_location = [s for s in existing_scans
|
|
if s['location_name'] == location['location_name']
|
|
and s['scanned_by'] == session['user_id']]
|
|
|
|
# Check for different location duplicates (by anyone)
|
|
diff_location = [s for s in existing_scans
|
|
if s['location_name'] != location['location_name']]
|
|
|
|
if same_location and diff_location:
|
|
# Status 04: Duplicate in both same and different locations
|
|
duplicate_status = '04'
|
|
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
|
|
duplicate_info = f"Also found in {other_locs}. Duplicate Lot"
|
|
needs_confirmation = True
|
|
|
|
elif same_location:
|
|
# Status 01: Duplicate in same location only
|
|
duplicate_status = '01'
|
|
duplicate_info = "Duplicate"
|
|
needs_confirmation = True
|
|
|
|
elif diff_location:
|
|
# Status 03: Duplicate in different location only
|
|
duplicate_status = '03'
|
|
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
|
|
duplicate_info = f"Also found in {other_locs}"
|
|
|
|
# If just checking, return early with baseline info
|
|
if check_only:
|
|
# Get baseline info to show what they're scanning
|
|
master_info = query_db('''
|
|
SELECT item, description FROM BaselineInventory_Master
|
|
WHERE session_id = ? AND lot_number = ?
|
|
LIMIT 1
|
|
''', [session_id, lot_number], one=True)
|
|
|
|
if needs_confirmation:
|
|
return jsonify({
|
|
'success': False,
|
|
'needs_confirmation': True,
|
|
'message': 'Lot already scanned, Are you sure?',
|
|
'duplicate_status': duplicate_status,
|
|
'item': master_info['item'] if master_info else None,
|
|
'description': master_info['description'] if master_info else None
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': True,
|
|
'needs_confirmation': False,
|
|
'item': master_info['item'] if master_info else None,
|
|
'description': master_info['description'] if master_info else None
|
|
})
|
|
|
|
# If needs confirmation and not yet confirmed, ask user
|
|
if needs_confirmation and not confirm_duplicate:
|
|
return jsonify({
|
|
'success': False,
|
|
'needs_confirmation': True,
|
|
'message': 'Lot already scanned, Are you sure?',
|
|
'duplicate_status': duplicate_status
|
|
})
|
|
|
|
# Check against MASTER baseline
|
|
master = query_db('''
|
|
SELECT * FROM BaselineInventory_Master
|
|
WHERE session_id = ? AND lot_number = ? AND system_bin = ?
|
|
''', [session_id, lot_number, location['location_name']], one=True)
|
|
|
|
# Determine master_status (only if not a duplicate issue)
|
|
if duplicate_status == '00':
|
|
if master:
|
|
# Lot exists in correct location
|
|
master_status = 'match'
|
|
if master['system_quantity'] is not None:
|
|
variance_lbs = weight - master['system_quantity']
|
|
variance_pct = (variance_lbs / master['system_quantity'] * 100) if master['system_quantity'] > 0 else 0
|
|
else:
|
|
variance_lbs = None
|
|
variance_pct = None
|
|
else:
|
|
# Check if lot exists in different location
|
|
master_other = query_db('''
|
|
SELECT * FROM BaselineInventory_Master
|
|
WHERE session_id = ? AND lot_number = ?
|
|
''', [session_id, lot_number], one=True)
|
|
|
|
if master_other:
|
|
master_status = 'wrong_location'
|
|
master = master_other
|
|
variance_lbs = None
|
|
variance_pct = None
|
|
else:
|
|
# Ghost lot
|
|
master_status = 'ghost_lot'
|
|
variance_lbs = None
|
|
variance_pct = None
|
|
else:
|
|
# For duplicates, still check baseline for item info
|
|
if not master:
|
|
master = query_db('''
|
|
SELECT * FROM BaselineInventory_Master
|
|
WHERE session_id = ? AND lot_number = ?
|
|
''', [session_id, lot_number], one=True)
|
|
master_status = 'match' # Don't override with wrong_location for duplicates
|
|
variance_lbs = None
|
|
variance_pct = None
|
|
|
|
# Insert scan
|
|
entry_id = execute_db('''
|
|
INSERT INTO ScanEntries
|
|
(session_id, location_count_id, lot_number, item, description,
|
|
scanned_location, actual_weight, scanned_by,
|
|
master_status, master_expected_location, master_expected_weight,
|
|
master_variance_lbs, master_variance_pct,
|
|
duplicate_status, duplicate_info, comment)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', [
|
|
session_id, location_count_id, lot_number,
|
|
master['item'] if master else None,
|
|
master['description'] if master else None,
|
|
location['location_name'], weight, session['user_id'],
|
|
master_status,
|
|
master['system_bin'] if master else None,
|
|
master['system_quantity'] if master else None,
|
|
variance_lbs, variance_pct,
|
|
duplicate_status, duplicate_info, duplicate_info
|
|
])
|
|
|
|
# If this is a confirmed duplicate (01 or 04), update previous scans in same location
|
|
updated_entry_ids = []
|
|
if duplicate_status in ['01', '04'] and confirm_duplicate:
|
|
same_location_ids = [s['entry_id'] for s in existing_scans
|
|
if s['location_name'] == location['location_name']
|
|
and s['scanned_by'] == session['user_id']]
|
|
|
|
for scan_id in same_location_ids:
|
|
execute_db('''
|
|
UPDATE ScanEntries
|
|
SET duplicate_status = ?,
|
|
duplicate_info = ?,
|
|
comment = ?,
|
|
modified_timestamp = CURRENT_TIMESTAMP
|
|
WHERE entry_id = ?
|
|
''', [duplicate_status, duplicate_info, duplicate_info, scan_id])
|
|
updated_entry_ids.append(scan_id)
|
|
|
|
# Update location count
|
|
execute_db('''
|
|
UPDATE LocationCounts
|
|
SET lots_found = lots_found + 1
|
|
WHERE location_count_id = ?
|
|
''', [location_count_id])
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'entry_id': entry_id,
|
|
'master_status': master_status,
|
|
'duplicate_status': duplicate_status,
|
|
'duplicate_info': duplicate_info,
|
|
'master_expected_location': master['system_bin'] if master else None,
|
|
'master_expected_weight': master['system_quantity'] if master else None,
|
|
'actual_weight': weight,
|
|
'variance_lbs': variance_lbs,
|
|
'item': master['item'] if master else 'Unknown Item',
|
|
'description': master['description'] if master else 'Not in system',
|
|
'updated_entry_ids': updated_entry_ids # IDs of scans that were updated to duplicate
|
|
})
|
|
|
|
|
|
@app.route('/scan/<int:entry_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_scan(entry_id):
|
|
"""Soft delete a scan and recalculate duplicate statuses"""
|
|
# Get the scan being deleted
|
|
scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ?', [entry_id], one=True)
|
|
|
|
if not scan:
|
|
return jsonify({'success': False, 'message': 'Scan not found'})
|
|
|
|
# Only allow user to delete their own scans
|
|
if scan['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
# Soft delete the scan
|
|
execute_db('''
|
|
UPDATE ScanEntries
|
|
SET is_deleted = 1,
|
|
deleted_by = ?,
|
|
deleted_timestamp = CURRENT_TIMESTAMP
|
|
WHERE entry_id = ?
|
|
''', [session['user_id'], entry_id])
|
|
|
|
# Recalculate duplicate statuses for this lot number in this session
|
|
updated_entries = recalculate_duplicate_status(scan['session_id'], scan['lot_number'], scan['scanned_location'])
|
|
|
|
# Update location count
|
|
execute_db('''
|
|
UPDATE LocationCounts
|
|
SET lots_found = lots_found - 1
|
|
WHERE location_count_id = ?
|
|
''', [scan['location_count_id']])
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Scan deleted',
|
|
'updated_entries': updated_entries # Return which scans were updated
|
|
})
|
|
|
|
|
|
@app.route('/scan/<int:entry_id>/details', methods=['GET'])
|
|
@login_required
|
|
def get_scan_details(entry_id):
|
|
"""Get detailed information about a scan"""
|
|
scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ? AND is_deleted = 0', [entry_id], one=True)
|
|
|
|
if not scan:
|
|
return jsonify({'success': False, 'message': 'Scan not found'})
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'scan': dict(scan)
|
|
})
|
|
|
|
|
|
@app.route('/scan/<int:entry_id>/update', methods=['POST'])
|
|
@login_required
|
|
def update_scan(entry_id):
|
|
"""Update scan item, weight and comment"""
|
|
data = request.get_json()
|
|
item = data.get('item', '').strip()
|
|
weight = data.get('weight')
|
|
comment = data.get('comment', '')
|
|
|
|
# Get the scan
|
|
scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ?', [entry_id], one=True)
|
|
|
|
if not scan:
|
|
return jsonify({'success': False, 'message': 'Scan not found'})
|
|
|
|
# Only allow user to update their own scans
|
|
if scan['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
try:
|
|
weight = float(weight)
|
|
except ValueError:
|
|
return jsonify({'success': False, 'message': 'Invalid weight value'})
|
|
|
|
# Update the scan
|
|
execute_db('''
|
|
UPDATE ScanEntries
|
|
SET item = ?,
|
|
actual_weight = ?,
|
|
comment = ?,
|
|
modified_timestamp = CURRENT_TIMESTAMP
|
|
WHERE entry_id = ?
|
|
''', [item, weight, comment, entry_id])
|
|
|
|
return jsonify({'success': True, 'message': 'Scan updated'})
|
|
|
|
|
|
def recalculate_duplicate_status(session_id, lot_number, current_location):
|
|
"""Recalculate duplicate statuses for a lot after deletion"""
|
|
# Track which entries were updated
|
|
updated_entries = []
|
|
|
|
# Get all active scans for this lot in this session
|
|
scans = query_db('''
|
|
SELECT se.*, lc.location_name, u.full_name, u.user_id as scan_user_id
|
|
FROM ScanEntries se
|
|
JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id
|
|
JOIN Users u ON se.scanned_by = u.user_id
|
|
WHERE se.session_id = ? AND se.lot_number = ? AND se.is_deleted = 0
|
|
ORDER BY se.scan_timestamp
|
|
''', [session_id, lot_number])
|
|
|
|
if not scans:
|
|
return updated_entries
|
|
|
|
# Reset all to status 00
|
|
for scan in scans:
|
|
execute_db('''
|
|
UPDATE ScanEntries
|
|
SET duplicate_status = '00',
|
|
duplicate_info = NULL,
|
|
comment = NULL,
|
|
modified_timestamp = CURRENT_TIMESTAMP
|
|
WHERE entry_id = ?
|
|
''', [scan['entry_id']])
|
|
updated_entries.append({
|
|
'entry_id': scan['entry_id'],
|
|
'duplicate_status': '00',
|
|
'duplicate_info': None
|
|
})
|
|
|
|
# Recalculate statuses
|
|
for i, scan in enumerate(scans):
|
|
# Get previous scans (before this one chronologically)
|
|
prev_scans = scans[:i]
|
|
|
|
if not prev_scans:
|
|
continue # First scan, stays 00
|
|
|
|
same_location = [s for s in prev_scans if s['location_name'] == scan['location_name'] and s['scan_user_id'] == scan['scan_user_id']]
|
|
diff_location = [s for s in prev_scans if s['location_name'] != scan['location_name']]
|
|
|
|
duplicate_status = '00'
|
|
duplicate_info = None
|
|
|
|
if same_location and diff_location:
|
|
# Status 04
|
|
duplicate_status = '04'
|
|
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
|
|
duplicate_info = f"Also found in {other_locs}. Duplicate Lot"
|
|
elif same_location:
|
|
# Status 01
|
|
duplicate_status = '01'
|
|
duplicate_info = "Duplicate"
|
|
elif diff_location:
|
|
# Status 03
|
|
duplicate_status = '03'
|
|
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
|
|
duplicate_info = f"Also found in {other_locs}"
|
|
|
|
# Update this scan if it changed from 00
|
|
if duplicate_status != '00':
|
|
execute_db('''
|
|
UPDATE ScanEntries
|
|
SET duplicate_status = ?,
|
|
duplicate_info = ?,
|
|
comment = ?,
|
|
modified_timestamp = CURRENT_TIMESTAMP
|
|
WHERE entry_id = ?
|
|
''', [duplicate_status, duplicate_info, duplicate_info, scan['entry_id']])
|
|
|
|
# Update our tracking list
|
|
for entry in updated_entries:
|
|
if entry['entry_id'] == scan['entry_id']:
|
|
entry['duplicate_status'] = duplicate_status
|
|
entry['duplicate_info'] = duplicate_info
|
|
break
|
|
|
|
# If status 01 or 04, also update previous scans in same location
|
|
if duplicate_status in ['01', '04'] and same_location:
|
|
for prev_scan in same_location:
|
|
execute_db('''
|
|
UPDATE ScanEntries
|
|
SET duplicate_status = ?,
|
|
duplicate_info = ?,
|
|
comment = ?,
|
|
modified_timestamp = CURRENT_TIMESTAMP
|
|
WHERE entry_id = ?
|
|
''', [duplicate_status, duplicate_info, duplicate_info, prev_scan['entry_id']])
|
|
|
|
# Update tracking for previous scans
|
|
for entry in updated_entries:
|
|
if entry['entry_id'] == prev_scan['entry_id']:
|
|
entry['duplicate_status'] = duplicate_status
|
|
entry['duplicate_info'] = duplicate_info
|
|
break
|
|
|
|
return updated_entries
|
|
|
|
|
|
@app.route('/count/<int:session_id>/location/<int:location_count_id>/finish', methods=['POST'])
|
|
@login_required
|
|
def finish_location(session_id, location_count_id):
|
|
"""Finish counting a location"""
|
|
# Get location info
|
|
location = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?',
|
|
[location_count_id], one=True)
|
|
|
|
if not location:
|
|
return jsonify({'success': False, 'message': 'Location not found'})
|
|
|
|
# Mark location as completed
|
|
execute_db('''
|
|
UPDATE LocationCounts
|
|
SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP
|
|
WHERE location_count_id = ?
|
|
''', [location_count_id])
|
|
|
|
# V1.0: Mark missing lots from MASTER baseline that weren't scanned
|
|
# Get all expected lots for this location from MASTER baseline
|
|
expected_lots = query_db('''
|
|
SELECT lot_number, item, description, system_quantity
|
|
FROM BaselineInventory_Master
|
|
WHERE session_id = ? AND system_bin = ?
|
|
''', [session_id, location['location_name']])
|
|
|
|
# Get all scanned lots for this location
|
|
scanned_lots = query_db('''
|
|
SELECT DISTINCT lot_number
|
|
FROM ScanEntries
|
|
WHERE location_count_id = ? AND is_deleted = 0
|
|
''', [location_count_id])
|
|
|
|
scanned_lot_numbers = {s['lot_number'] for s in scanned_lots}
|
|
|
|
# Insert missing lots
|
|
for expected in expected_lots:
|
|
if expected['lot_number'] not in scanned_lot_numbers:
|
|
execute_db('''
|
|
INSERT INTO MissingLots (session_id, lot_number, master_expected_location, item, master_expected_quantity, marked_by)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''', [session_id, expected['lot_number'], location['location_name'],
|
|
expected['item'], expected['system_quantity'], session['user_id']])
|
|
|
|
flash('Location count completed!', 'success')
|
|
return jsonify({
|
|
'success': True,
|
|
'redirect': url_for('count_session', session_id=session_id)
|
|
})
|
|
|
|
|
|
# ==================== ROUTES: USER MANAGEMENT ====================
|
|
|
|
@app.route('/settings/users')
|
|
@role_required('owner', 'admin')
|
|
def manage_users():
|
|
"""User management page"""
|
|
# Get all users
|
|
if session['role'] == 'owner':
|
|
# Owners can see everyone
|
|
users = query_db('SELECT * FROM Users ORDER BY role, full_name')
|
|
else:
|
|
# Admins can only see staff
|
|
users = query_db("SELECT * FROM Users WHERE role = 'staff' ORDER BY full_name")
|
|
|
|
return render_template('manage_users.html', users=users)
|
|
|
|
|
|
@app.route('/settings/users/add', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def add_user():
|
|
"""Add a new user"""
|
|
data = request.get_json()
|
|
|
|
username = data.get('username', '').strip()
|
|
password = data.get('password', '')
|
|
first_name = data.get('first_name', '').strip()
|
|
last_name = data.get('last_name', '').strip()
|
|
email = data.get('email', '').strip()
|
|
role = data.get('role', 'staff')
|
|
branch = data.get('branch', 'Main')
|
|
|
|
# Validation
|
|
if not username or not password or not first_name or not last_name:
|
|
return jsonify({'success': False, 'message': 'Username, password, first name, and last name are required'})
|
|
|
|
# Admins can't create admins or owners
|
|
if session['role'] == 'admin' and role in ['admin', 'owner']:
|
|
return jsonify({'success': False, 'message': 'Permission denied: Admins can only create Staff users'})
|
|
|
|
# Check if username exists
|
|
existing = query_db('SELECT user_id FROM Users WHERE username = ?', [username], one=True)
|
|
if existing:
|
|
return jsonify({'success': False, 'message': 'Username already exists'})
|
|
|
|
# Create user
|
|
full_name = f"{first_name} {last_name}"
|
|
hashed_password = generate_password_hash(password)
|
|
|
|
try:
|
|
execute_db('''
|
|
INSERT INTO Users (username, password, full_name, role, branch, is_active)
|
|
VALUES (?, ?, ?, ?, ?, 1)
|
|
''', [username, hashed_password, full_name, role, branch])
|
|
|
|
return jsonify({'success': True, 'message': 'User created successfully'})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': f'Error creating user: {str(e)}'})
|
|
|
|
|
|
@app.route('/settings/users/<int:user_id>', methods=['GET'])
|
|
@role_required('owner', 'admin')
|
|
def get_user(user_id):
|
|
"""Get user details"""
|
|
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
|
|
|
|
if not user:
|
|
return jsonify({'success': False, 'message': 'User not found'})
|
|
|
|
# Admins can't view other admins or owners
|
|
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
# Split full name
|
|
name_parts = user['full_name'].split(' ', 1)
|
|
first_name = name_parts[0] if len(name_parts) > 0 else ''
|
|
last_name = name_parts[1] if len(name_parts) > 1 else ''
|
|
|
|
# Get email, handle None
|
|
email = user['email'] if user['email'] else ''
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'user': {
|
|
'user_id': user['user_id'],
|
|
'username': user['username'],
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
'email': email,
|
|
'role': user['role'],
|
|
'branch': user['branch'],
|
|
'is_active': user['is_active']
|
|
}
|
|
})
|
|
|
|
|
|
@app.route('/settings/users/<int:user_id>/update', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def update_user(user_id):
|
|
"""Update user details"""
|
|
data = request.get_json()
|
|
|
|
# Get existing user
|
|
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
|
|
|
|
if not user:
|
|
return jsonify({'success': False, 'message': 'User not found'})
|
|
|
|
# Admins can't edit other admins or owners
|
|
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
# Can't edit yourself to change your own role or deactivate
|
|
if user_id == session['user_id']:
|
|
if data.get('role') != user['role']:
|
|
return jsonify({'success': False, 'message': 'Cannot change your own role'})
|
|
if data.get('is_active') == 0:
|
|
return jsonify({'success': False, 'message': 'Cannot deactivate yourself'})
|
|
|
|
# Build update
|
|
username = data.get('username', '').strip()
|
|
first_name = data.get('first_name', '').strip()
|
|
last_name = data.get('last_name', '').strip()
|
|
email = data.get('email', '').strip()
|
|
role = data.get('role', user['role'])
|
|
branch = data.get('branch', user['branch'])
|
|
is_active = data.get('is_active', user['is_active'])
|
|
password = data.get('password', '').strip()
|
|
|
|
if not username or not first_name or not last_name:
|
|
return jsonify({'success': False, 'message': 'Username, first name, and last name are required'})
|
|
|
|
# Check if username is taken by another user
|
|
if username != user['username']:
|
|
existing = query_db('SELECT user_id FROM Users WHERE username = ? AND user_id != ?', [username, user_id], one=True)
|
|
if existing:
|
|
return jsonify({'success': False, 'message': 'Username already taken'})
|
|
|
|
# Admins can't change role to admin or owner
|
|
if session['role'] == 'admin' and role in ['admin', 'owner']:
|
|
return jsonify({'success': False, 'message': 'Permission denied: Cannot assign Admin or Owner role'})
|
|
|
|
full_name = f"{first_name} {last_name}"
|
|
|
|
try:
|
|
if password:
|
|
# Update with new password
|
|
hashed_password = generate_password_hash(password)
|
|
execute_db('''
|
|
UPDATE Users
|
|
SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?, password = ?
|
|
WHERE user_id = ?
|
|
''', [username, full_name, email, role, branch, is_active, hashed_password, user_id])
|
|
else:
|
|
# Update without changing password
|
|
execute_db('''
|
|
UPDATE Users
|
|
SET username = ?, full_name = ?, email = ?, role = ?, branch = ?, is_active = ?
|
|
WHERE user_id = ?
|
|
''', [username, full_name, email, role, branch, is_active, user_id])
|
|
|
|
return jsonify({'success': True, 'message': 'User updated successfully'})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': f'Error updating user: {str(e)}'})
|
|
|
|
|
|
@app.route('/settings/users/<int:user_id>/delete', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def delete_user(user_id):
|
|
"""Delete (deactivate) a user"""
|
|
# Get user
|
|
user = query_db('SELECT * FROM Users WHERE user_id = ?', [user_id], one=True)
|
|
|
|
if not user:
|
|
return jsonify({'success': False, 'message': 'User not found'})
|
|
|
|
# Admins can't delete other admins or owners
|
|
if session['role'] == 'admin' and user['role'] in ['admin', 'owner']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
# Can't delete yourself
|
|
if user_id == session['user_id']:
|
|
return jsonify({'success': False, 'message': 'Cannot delete yourself'})
|
|
|
|
# Soft delete (deactivate)
|
|
try:
|
|
execute_db('UPDATE Users SET is_active = 0 WHERE user_id = ?', [user_id])
|
|
return jsonify({'success': True, 'message': 'User deleted successfully'})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': f'Error deleting user: {str(e)}'})
|
|
|
|
|
|
# ==================== PWA SUPPORT ROUTES ====================
|
|
|
|
@app.route('/manifest.json')
|
|
def serve_manifest():
|
|
"""Serve the PWA manifest file from the static directory"""
|
|
return send_from_directory('static', 'manifest.json')
|
|
|
|
@app.route('/sw.js')
|
|
def serve_sw():
|
|
"""Serve the Service Worker file from the static directory"""
|
|
return send_from_directory('static', 'sw.js')
|
|
|
|
# ==================== RUN APPLICATION ====================
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=5000)
|