Files
ScanLook/modules/invcount/routes.py
Javier 406219547d feat: Implement modular plugin architecture
- Convert invcount to self-contained module
- Add Module Manager for install/uninstall
- Create module_registry database table
- Support hot-reloading of modules
- Move data imports into invcount module
- Update all templates and routes to new structure

Version bumped to 0.16.0
2026-02-07 01:47:49 -06:00

1391 lines
61 KiB
Python

"""
Inventory Counts Module - Routes
Consolidated from counting.py, sessions.py, and admin_locations.py
"""
import csv
import io
from flask import render_template, request, redirect, url_for, flash, jsonify, session, current_app
from db import query_db, execute_db, get_db
from utils import login_required, role_required
def register_routes(bp):
"""Register all invcount routes on the blueprint"""
# =========================================================================
# HELPER FUNCTIONS
# =========================================================================
def get_active_session(session_id):
"""Get session if it exists and is not archived. Returns None if invalid."""
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
if not sess or sess['status'] == 'archived':
return None
return sess
# =========================================================================
# COUNTING ROUTES (from counting.py)
# =========================================================================
@bp.route('/admin')
@login_required
def admin_dashboard():
"""Admin dashboard for Counts module"""
# Security check: Ensure user is admin/owner
if session.get('role') not in ['owner', 'admin']:
flash('Access denied. Admin role required.', 'danger')
return redirect(url_for('invcount.index'))
show_archived = request.args.get('show_archived', '0') == '1'
if show_archived:
sessions_list = query_db('''
SELECT s.*, u.full_name as created_by_name,
COUNT(DISTINCT lc.location_count_id) as total_locations,
SUM(CASE WHEN lc.status = 'completed' THEN 1 ELSE 0 END) as completed_locations,
SUM(CASE WHEN lc.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_locations
FROM CountSessions s
LEFT JOIN Users u ON s.created_by = u.user_id
LEFT JOIN LocationCounts lc ON s.session_id = lc.session_id
WHERE s.status IN ('active', 'archived')
GROUP BY s.session_id
ORDER BY s.status ASC, s.created_timestamp DESC
''')
else:
sessions_list = query_db('''
SELECT s.*, u.full_name as created_by_name,
COUNT(DISTINCT lc.location_count_id) as total_locations,
SUM(CASE WHEN lc.status = 'completed' THEN 1 ELSE 0 END) as completed_locations,
SUM(CASE WHEN lc.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_locations
FROM CountSessions s
LEFT JOIN Users u ON s.created_by = u.user_id
LEFT JOIN LocationCounts lc ON s.session_id = lc.session_id
WHERE s.status = 'active'
GROUP BY s.session_id
ORDER BY s.created_timestamp DESC
''')
return render_template('invcount/admin_dashboard.html', sessions=sessions_list, show_archived=show_archived)
@bp.route('/')
@login_required
def index():
"""Counts module landing - show active sessions"""
user_id = session.get('user_id')
has_access = query_db('''
SELECT 1 FROM UserModules um
JOIN Modules m ON um.module_id = m.module_id
WHERE um.user_id = ? AND m.module_key = 'invcount' AND m.is_active = 1
''', [user_id], one=True)
if not has_access:
flash('You do not have access to this module', 'danger')
return redirect(url_for('home'))
active_sessions = query_db('''
SELECT session_id, session_name, session_type, created_timestamp
FROM CountSessions
WHERE status = 'active'
ORDER BY created_timestamp DESC
''')
return render_template('invcount/staff_dashboard.html', sessions=active_sessions)
@bp.route('/count/<int:session_id>')
@login_required
def count_session(session_id):
"""Select session and begin counting"""
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ? AND status = "active"',
[session_id], one=True)
if not sess:
flash('Session not found or not active', 'danger')
return redirect(url_for('invcount.index'))
# Redirect to my_counts page (staff can manage multiple bins)
return redirect(url_for('invcount.my_counts', session_id=session_id))
@bp.route('/session/<int:session_id>/my-counts')
@login_required
def my_counts(session_id):
"""Staff view of their active and completed bins"""
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
if not sess:
flash('Session not found', 'danger')
return redirect(url_for('invcount.index'))
if sess['status'] == 'archived':
flash('This session has been archived', 'warning')
return redirect(url_for('invcount.index'))
# Get this user's active bins
active_bins = query_db('''
SELECT lc.*,
COUNT(se.entry_id) as scan_count
FROM LocationCounts lc
LEFT JOIN ScanEntries se ON lc.location_count_id = se.location_count_id AND se.is_deleted = 0
WHERE lc.session_id = ?
AND lc.status = 'in_progress'
AND lc.is_deleted = 0
AND (
lc.counted_by = ?
OR lc.location_count_id IN (
SELECT location_count_id FROM ScanEntries
WHERE scanned_by = ? AND is_deleted = 0
)
)
GROUP BY lc.location_count_id
ORDER BY lc.start_timestamp DESC
''', [session_id, session['user_id'], session['user_id']])
# Get this user's completed bins
completed_bins = query_db('''
SELECT lc.*,
COUNT(se.entry_id) as scan_count
FROM LocationCounts lc
LEFT JOIN ScanEntries se ON lc.location_count_id = se.location_count_id AND se.is_deleted = 0
WHERE lc.session_id = ?
AND lc.status = 'completed'
AND (
lc.counted_by = ?
OR lc.location_count_id IN (
SELECT location_count_id FROM ScanEntries
WHERE scanned_by = ? AND is_deleted = 0
)
)
GROUP BY lc.location_count_id
ORDER BY lc.start_timestamp DESC
''', [session_id, session['user_id'], session['user_id']])
return render_template('/invcount/my_counts.html',
count_session=sess,
active_bins=active_bins,
completed_bins=completed_bins)
@bp.route('/session/<int:session_id>/start-bin', methods=['POST'])
@login_required
def start_bin_count(session_id):
"""Start counting a new bin or resume an existing in-progress one"""
sess = get_active_session(session_id)
if not sess:
flash('Session not found or archived', 'warning')
return redirect(url_for('invcount.index'))
if not sess['master_baseline_timestamp']:
flash('Master File not uploaded. Please upload it before starting bins.', 'warning')
return redirect(url_for('invcount.my_counts', session_id=session_id))
location_name = request.form.get('location_name', '').strip().upper()
if not location_name:
flash('Bin number is required', 'danger')
return redirect(url_for('invcount.my_counts', session_id=session_id))
# --- NEW LOGIC: Check for existing in-progress bin ---
existing_bin = query_db('''
SELECT location_count_id
FROM LocationCounts
WHERE session_id = ? AND location_name = ? AND status = 'in_progress'
''', [session_id, location_name], one=True)
if existing_bin:
flash(f'Resuming bin: {location_name}', 'info')
return redirect(url_for('invcount.count_location',
session_id=session_id,
location_count_id=existing_bin['location_count_id']))
# --- END NEW LOGIC ---
# Count expected lots from MASTER baseline for this location
expected_lots = query_db('''
SELECT COUNT(DISTINCT lot_number) as count
FROM BaselineInventory_Master
WHERE session_id = ? AND system_bin = ?
''', [session_id, location_name], one=True)
expected_count = expected_lots['count'] if expected_lots else 0
# Create new location count if none existed
conn = get_db()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO LocationCounts (session_id, location_name, counted_by, status, start_timestamp, expected_lots_master)
VALUES (?, ?, ?, 'in_progress', CURRENT_TIMESTAMP, ?)
''', [session_id, location_name, session['user_id'], expected_count])
location_count_id = cursor.lastrowid
conn.commit()
conn.close()
flash(f'Started counting bin: {location_name}', 'success')
return redirect(url_for('invcount.count_location', session_id=session_id, location_count_id=location_count_id))
@bp.route('/location/<int:location_count_id>/complete', methods=['POST'])
@login_required
def complete_location(location_count_id):
"""Mark a location count as complete (Simple toggle)"""
# Verify ownership
loc = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True)
if not loc:
return jsonify({'success': False, 'message': 'Location not found'})
if loc['counted_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Mark as completed
execute_db('''
UPDATE LocationCounts
SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP
WHERE location_count_id = ?
''', [location_count_id])
return jsonify({'success': True, 'message': 'Bin marked as complete'})
@bp.route('/count/<int:session_id>/location/<int:location_count_id>')
@login_required
def count_location(session_id, location_count_id):
"""Count lots in a specific location"""
# Get session info to determine type (Cycle Count vs Physical)
sess = get_active_session(session_id)
if not sess:
flash('Session not found or archived', 'warning')
return redirect(url_for('invcount.index'))
if not sess['master_baseline_timestamp']:
flash('Master File not uploaded. Please upload it before starting bins.', 'warning')
return redirect(url_for('invcount.my_counts', session_id=session_id))
location = query_db('''
SELECT * FROM LocationCounts
WHERE location_count_id = ? AND session_id = ?
''', [location_count_id, session_id], one=True)
if not location:
flash('Location not found', 'danger')
return redirect(url_for('invcount.count_session', session_id=session_id))
# Check if location is completed and user is staff (not admin/owner)
if location['status'] == 'completed' and session['role'] == 'staff':
flash(f'Location {location["location_name"]} has been finalized and cannot accept new scans', 'warning')
return redirect(url_for('invcount.my_counts', session_id=session_id))
# Get scans for this location (Scanned Lots)
scans = query_db('''
SELECT * FROM ScanEntries
WHERE location_count_id = ? AND is_deleted = 0
ORDER BY scan_timestamp DESC
''', [location_count_id])
# NEW LOGIC: Get Expected Lots for Cycle Counts (Grouped & Summed)
expected_lots = []
if sess and sess['session_type'] == 'cycle_count':
expected_lots = query_db('''
SELECT
lot_number,
MAX(item) as item, -- Pick one item code if they differ (rare)
SUM(system_quantity) as total_weight
FROM BaselineInventory_Master
WHERE session_id = ?
AND system_bin = ?
AND lot_number NOT IN (
SELECT lot_number
FROM ScanEntries
WHERE location_count_id = ?
AND is_deleted = 0
)
GROUP BY lot_number
ORDER BY lot_number
''', [session_id, location['location_name'], location_count_id])
return render_template('invcount/count_location.html',
session_id=session_id,
location=location,
scans=scans,
expected_lots=expected_lots,
session_type=sess['session_type'] if sess else '')
@bp.route('/count/<int:session_id>/location/<int:location_count_id>/scan', methods=['POST'])
@login_required
def scan_lot(session_id, location_count_id):
"""Process a lot scan with duplicate detection"""
sess = get_active_session(session_id)
if not sess:
return jsonify({'success': False, 'message': 'Session not found or archived'})
data = request.get_json()
lot_number = data.get('lot_number', '').strip()
weight = data.get('weight')
confirm_duplicate = data.get('confirm_duplicate', False)
check_only = data.get('check_only', False) # Just checking for duplicates, not saving
if not lot_number:
return jsonify({'success': False, 'message': 'Lot number required'})
if not check_only and not weight:
return jsonify({'success': False, 'message': 'Weight required'})
if not check_only:
try:
weight = float(weight)
except ValueError:
return jsonify({'success': False, 'message': 'Invalid weight value'})
# Get location info
location = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?',
[location_count_id], one=True)
# Check for duplicates in this session
existing_scans = query_db('''
SELECT se.*, lc.location_name, u.full_name
FROM ScanEntries se
JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id
JOIN Users u ON se.scanned_by = u.user_id
WHERE se.session_id = ? AND se.lot_number = ? AND se.is_deleted = 0
''', [session_id, lot_number])
duplicate_status = '00' # Default: no duplicate
duplicate_info = None
needs_confirmation = False
if existing_scans:
# Check for same location duplicates (by this user)
same_location = [s for s in existing_scans
if s['location_name'] == location['location_name']
and s['scanned_by'] == session['user_id']]
# Check for different location duplicates (by anyone)
diff_location = [s for s in existing_scans
if s['location_name'] != location['location_name']]
if same_location and diff_location:
# Status 04: Duplicate in both same and different locations
duplicate_status = '04'
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
duplicate_info = f"Also found in {other_locs}. Duplicate Lot"
needs_confirmation = True
elif same_location:
# Status 01: Duplicate in same location only
duplicate_status = '01'
duplicate_info = "Duplicate"
needs_confirmation = True
elif diff_location:
# Status 03: Duplicate in different location only
duplicate_status = '03'
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
duplicate_info = f"Also found in {other_locs}"
# If just checking, return early with baseline info
if check_only:
# Get baseline info to show what they're scanning
master_info = query_db('''
SELECT item, description FROM BaselineInventory_Master
WHERE session_id = ? AND lot_number = ?
LIMIT 1
''', [session_id, lot_number], one=True)
if needs_confirmation:
return jsonify({
'success': False,
'needs_confirmation': True,
'message': 'Lot already scanned, Are you sure?',
'duplicate_status': duplicate_status,
'item': master_info['item'] if master_info else None,
'description': master_info['description'] if master_info else None
})
else:
return jsonify({
'success': True,
'needs_confirmation': False,
'item': master_info['item'] if master_info else None,
'description': master_info['description'] if master_info else None
})
# If needs confirmation and not yet confirmed, ask user
if needs_confirmation and not confirm_duplicate:
return jsonify({
'success': False,
'needs_confirmation': True,
'message': 'Lot already scanned, Are you sure?',
'duplicate_status': duplicate_status
})
# Check against MASTER baseline
master = query_db('''
SELECT
system_bin,
SUM(system_quantity) as system_quantity,
MAX(item) as item,
MAX(description) as description
FROM BaselineInventory_Master
WHERE session_id = ? AND lot_number = ? AND system_bin = ?
GROUP BY system_bin
''', [session_id, lot_number, location['location_name']], one=True)
# Determine master_status (only if not a duplicate issue)
if duplicate_status == '00':
if master:
# Lot exists in correct location
master_status = 'match'
if master['system_quantity'] is not None:
variance_lbs = weight - master['system_quantity']
variance_pct = (variance_lbs / master['system_quantity'] * 100) if master['system_quantity'] > 0 else 0
else:
variance_lbs = None
variance_pct = None
else:
# Check if lot exists in different location
master_other = query_db('''
SELECT
system_bin,
SUM(system_quantity) as system_quantity,
MAX(item) as item,
MAX(description) as description
FROM BaselineInventory_Master
WHERE session_id = ? AND lot_number = ?
GROUP BY system_bin
ORDER BY system_bin
LIMIT 1
''', [session_id, lot_number], one=True)
if master_other:
master_status = 'wrong_location'
master = master_other
variance_lbs = None
variance_pct = None
else:
# Ghost lot
master_status = 'ghost_lot'
variance_lbs = None
variance_pct = None
else:
# For duplicates, still check baseline for item info
if not master:
master = query_db('''
SELECT
system_bin,
SUM(system_quantity) as system_quantity,
MAX(item) as item,
MAX(description) as description
FROM BaselineInventory_Master
WHERE session_id = ? AND lot_number = ?
GROUP BY system_bin
ORDER BY system_bin
LIMIT 1
''', [session_id, lot_number], one=True)
master_status = 'match' # Don't override with wrong_location for duplicates
variance_lbs = None
variance_pct = None
# Insert scan
entry_id = execute_db('''
INSERT INTO ScanEntries
(session_id, location_count_id, lot_number, item, description,
scanned_location, actual_weight, scanned_by,
master_status, master_expected_location, master_expected_weight,
master_variance_lbs, master_variance_pct,
duplicate_status, duplicate_info, comment)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', [
session_id, location_count_id, lot_number,
master['item'] if master else None,
master['description'] if master else None,
location['location_name'], weight, session['user_id'],
master_status,
master['system_bin'] if master else None,
master['system_quantity'] if master else None,
variance_lbs, variance_pct,
duplicate_status, duplicate_info, duplicate_info
])
# If this is a confirmed duplicate (01 or 04), update previous scans in same location
updated_entry_ids = []
if duplicate_status in ['01', '04'] and confirm_duplicate:
same_location_ids = [s['entry_id'] for s in existing_scans
if s['location_name'] == location['location_name']
and s['scanned_by'] == session['user_id']]
for scan_id in same_location_ids:
execute_db('''
UPDATE ScanEntries
SET duplicate_status = ?,
duplicate_info = ?,
comment = ?,
modified_timestamp = CURRENT_TIMESTAMP
WHERE entry_id = ?
''', [duplicate_status, duplicate_info, duplicate_info, scan_id])
updated_entry_ids.append(scan_id)
# Update location count
execute_db('''
UPDATE LocationCounts
SET lots_found = lots_found + 1
WHERE location_count_id = ?
''', [location_count_id])
return jsonify({
'success': True,
'entry_id': entry_id,
'master_status': master_status,
'duplicate_status': duplicate_status,
'duplicate_info': duplicate_info,
'master_expected_location': master['system_bin'] if master else None,
'master_expected_weight': master['system_quantity'] if master else None,
'actual_weight': weight,
'variance_lbs': variance_lbs,
'item': master['item'] if master else 'Unknown Item',
'description': master['description'] if master else 'Not in system',
'updated_entry_ids': updated_entry_ids # IDs of scans that were updated to duplicate
})
@bp.route('/scan/<int:entry_id>/delete', methods=['POST'])
@login_required
def delete_scan(entry_id):
"""Soft delete a scan and recalculate duplicate statuses"""
# Get the scan being deleted
scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ? AND is_deleted = 0', [entry_id], one=True)
if not scan:
return jsonify({'success': False, 'message': 'Scan not found'})
# Only allow user to delete their own scans
if scan['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'})
# Soft delete the scan
execute_db('''
UPDATE ScanEntries
SET is_deleted = 1,
deleted_by = ?,
deleted_timestamp = CURRENT_TIMESTAMP
WHERE entry_id = ?
''', [session['user_id'], entry_id])
# Recalculate duplicate statuses for this lot number in this session
updated_entries = recalculate_duplicate_status(scan['session_id'], scan['lot_number'], scan['scanned_location'])
# Update location count
execute_db('''
UPDATE LocationCounts
SET lots_found = lots_found - 1
WHERE location_count_id = ?
''', [scan['location_count_id']])
return jsonify({
'success': True,
'message': 'Scan deleted',
'updated_entries': updated_entries # Return which scans were updated
})
@bp.route('/scan/<int:entry_id>/details', methods=['GET'])
@login_required
def get_scan_details(entry_id):
"""Get detailed information about a scan"""
scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ? AND is_deleted = 0', [entry_id], one=True)
if not scan:
return jsonify({'success': False, 'message': 'Scan not found'})
return jsonify({
'success': True,
'scan': dict(scan)
})
@bp.route('/scan/<int:entry_id>/update', methods=['POST'])
@login_required
def update_scan(entry_id):
"""Update scan item, weight and comment"""
data = request.get_json()
item = data.get('item', '').strip()
weight = data.get('weight')
comment = data.get('comment', '')
# Get the scan
scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ? AND is_deleted = 0', [entry_id], one=True)
if not scan:
return jsonify({'success': False, 'message': 'Scan not found'})
# Only allow user to update their own scans
if scan['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'})
try:
weight = float(weight)
except ValueError:
return jsonify({'success': False, 'message': 'Invalid weight value'})
# Update the scan
execute_db('''
UPDATE ScanEntries
SET item = ?,
actual_weight = ?,
comment = ?,
modified_timestamp = CURRENT_TIMESTAMP
WHERE entry_id = ? and is_deleted = 0
''', [item, weight, comment, entry_id])
return jsonify({'success': True, 'message': 'Scan updated'})
def recalculate_duplicate_status(session_id, lot_number, current_location):
"""Recalculate duplicate statuses for a lot after deletion"""
# Track which entries were updated
updated_entries = []
# Get all active scans for this lot in this session
scans = query_db('''
SELECT se.*, lc.location_name, u.full_name, u.user_id as scan_user_id
FROM ScanEntries se
JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id
JOIN Users u ON se.scanned_by = u.user_id
WHERE se.session_id = ? AND se.lot_number = ? AND se.is_deleted = 0
ORDER BY se.scan_timestamp
''', [session_id, lot_number])
if not scans:
return updated_entries
# Reset all to status 00
for scan in scans:
execute_db('''
UPDATE ScanEntries
SET duplicate_status = '00',
duplicate_info = NULL,
comment = NULL,
modified_timestamp = CURRENT_TIMESTAMP
WHERE entry_id = ? and is_deleted = 0
''', [scan['entry_id']])
updated_entries.append({
'entry_id': scan['entry_id'],
'duplicate_status': '00',
'duplicate_info': None
})
# Recalculate statuses
for i, scan in enumerate(scans):
# Get previous scans (before this one chronologically)
prev_scans = scans[:i]
if not prev_scans:
continue # First scan, stays 00
same_location = [s for s in prev_scans if s['location_name'] == scan['location_name'] and s['scan_user_id'] == scan['scan_user_id']]
diff_location = [s for s in prev_scans if s['location_name'] != scan['location_name']]
duplicate_status = '00'
duplicate_info = None
if same_location and diff_location:
# Status 04
duplicate_status = '04'
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
duplicate_info = f"Also found in {other_locs}. Duplicate Lot"
elif same_location:
# Status 01
duplicate_status = '01'
duplicate_info = "Duplicate"
elif diff_location:
# Status 03
duplicate_status = '03'
other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location))
duplicate_info = f"Also found in {other_locs}"
# Update this scan if it changed from 00
if duplicate_status != '00':
execute_db('''
UPDATE ScanEntries
SET duplicate_status = ?,
duplicate_info = ?,
comment = ?,
modified_timestamp = CURRENT_TIMESTAMP
WHERE entry_id = ? and is_deleted = 0
''', [duplicate_status, duplicate_info, duplicate_info, scan['entry_id']])
# Update our tracking list
for entry in updated_entries:
if entry['entry_id'] == scan['entry_id']:
entry['duplicate_status'] = duplicate_status
entry['duplicate_info'] = duplicate_info
break
# If status 01 or 04, also update previous scans in same location
if duplicate_status in ['01', '04'] and same_location:
for prev_scan in same_location:
execute_db('''
UPDATE ScanEntries
SET duplicate_status = ?,
duplicate_info = ?,
comment = ?,
modified_timestamp = CURRENT_TIMESTAMP
WHERE entry_id = ? and is_deleted = 0
''', [duplicate_status, duplicate_info, duplicate_info, prev_scan['entry_id']])
# Update tracking for previous scans
for entry in updated_entries:
if entry['entry_id'] == prev_scan['entry_id']:
entry['duplicate_status'] = duplicate_status
entry['duplicate_info'] = duplicate_info
break
return updated_entries
@bp.route('/count/<int:session_id>/location/<int:location_count_id>/finish', methods=['POST'])
@login_required
def finish_location(session_id, location_count_id):
"""Finish counting a location"""
sess = get_active_session(session_id)
if not sess:
return jsonify({'success': False, 'message': 'Session not found or archived'})
# Get location info
location = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?',
[location_count_id], one=True)
if not location:
return jsonify({'success': False, 'message': 'Location not found'})
# Mark location as completed
execute_db('''
UPDATE LocationCounts
SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP
WHERE location_count_id = ?
''', [location_count_id])
# V1.0: Mark missing lots from MASTER baseline that weren't scanned
# Get all expected lots for this location from MASTER baseline
expected_lots = query_db('''
SELECT lot_number, item, description, system_quantity
FROM BaselineInventory_Master
WHERE session_id = ? AND system_bin = ?
''', [session_id, location['location_name']])
# Get all scanned lots for this location
scanned_lots = query_db('''
SELECT DISTINCT lot_number
FROM ScanEntries
WHERE location_count_id = ? AND is_deleted = 0
''', [location_count_id])
scanned_lot_numbers = {s['lot_number'] for s in scanned_lots}
# Insert missing lots
for expected in expected_lots:
if expected['lot_number'] not in scanned_lot_numbers:
execute_db('''
INSERT INTO MissingLots (session_id, lot_number, master_expected_location, item, master_expected_quantity, marked_by)
VALUES (?, ?, ?, ?, ?, ?)
''', [session_id, expected['lot_number'], location['location_name'],
expected['item'], expected['system_quantity'], session['user_id']])
flash('Location count completed!', 'success')
return jsonify({
'success': True,
'redirect': url_for('invcount.count_session', session_id=session_id)
})
@bp.route('/session/<int:session_id>/finalize-all', methods=['POST'])
@login_required
def finalize_all_locations(session_id):
"""Finalize all 'in_progress' locations in a session"""
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'}), 403
# 1. Get all in_progress locations for this session
locations = query_db('''
SELECT location_count_id, location_name
FROM LocationCounts
WHERE session_id = ?
AND status = 'in_progress'
AND is_deleted = 0
''', [session_id])
if not locations:
return jsonify({'success': True, 'message': 'No open bins to finalize.'})
# 2. Loop through and run the finalize logic for each
for loc in locations:
# We reuse the logic from your existing finish_location route
execute_db('''
UPDATE LocationCounts
SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP
WHERE location_count_id = ?
''', [loc['location_count_id']])
# Identify missing lots from MASTER baseline
expected_lots = query_db('''
SELECT lot_number, item, description, system_quantity
FROM BaselineInventory_Master
WHERE session_id = ? AND system_bin = ?
''', [session_id, loc['location_name']])
scanned_lots = query_db('''
SELECT DISTINCT lot_number
FROM ScanEntries
WHERE location_count_id = ? AND is_deleted = 0
''', [loc['location_count_id']])
scanned_lot_numbers = {s['lot_number'] for s in scanned_lots}
for expected in expected_lots:
if expected['lot_number'] not in scanned_lot_numbers:
execute_db('''
INSERT INTO MissingLots (session_id, lot_number, master_expected_location, item, master_expected_quantity, marked_by)
VALUES (?, ?, ?, ?, ?, ?)
''', [session_id, expected['lot_number'], loc['location_name'],
expected['item'], expected['system_quantity'], session['user_id']])
return jsonify({'success': True, 'message': f'Successfully finalized {len(locations)} bins.'})
# =========================================================================
# SESSION MANAGEMENT ROUTES (from sessions.py)
# =========================================================================
@bp.route('/session/create', methods=['GET', 'POST'])
@role_required('owner', 'admin')
def create_session():
"""Create new count session"""
if request.method == 'POST':
session_name = request.form.get('session_name', '').strip()
session_type = request.form.get('session_type')
if not session_name:
flash('Session name is required', 'danger')
return redirect(url_for('invcount.create_session'))
session_id = execute_db('''
INSERT INTO CountSessions (session_name, session_type, created_by, branch)
VALUES (?, ?, ?, ?)
''', [session_name, session_type, session['user_id'], 'Main'])
flash(f'Session "{session_name}" created successfully!', 'success')
return redirect(url_for('invcount.session_detail', session_id=session_id))
return render_template('invcount/create_session.html')
@bp.route('/session/<int:session_id>')
@role_required('owner', 'admin')
def session_detail(session_id):
"""Session detail and monitoring page"""
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
if not sess:
flash('Session not found', 'danger')
return redirect(url_for('dashboard'))
# Get statistics
stats = query_db('''
SELECT
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.is_deleted = 0) as total_scans,
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) < 0.01) as matched,
COUNT(DISTINCT se.lot_number) FILTER (WHERE se.duplicate_status IN ('01', '03', '04') AND se.is_deleted = 0) as duplicates,
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01) as weight_discrepancy,
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'wrong_location' AND se.is_deleted = 0) as wrong_location,
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'ghost_lot' AND se.is_deleted = 0) as ghost_lots,
COUNT(DISTINCT ml.missing_id) as missing_lots
FROM CountSessions cs
LEFT JOIN ScanEntries se ON cs.session_id = se.session_id
LEFT JOIN MissingLots ml ON cs.session_id = ml.session_id
WHERE cs.session_id = ?
''', [session_id], one=True)
# Get location progress
# We add a subquery to count the actual missing lots for each bin
locations = query_db('''
SELECT
lc.*,
u.full_name as counter_name,
(SELECT COUNT(*) FROM MissingLots ml
WHERE ml.session_id = lc.session_id
AND ml.master_expected_location = lc.location_name) as lots_missing_calc
FROM LocationCounts lc
LEFT JOIN Users u ON lc.counted_by = u.user_id
WHERE lc.session_id = ?
AND lc.is_deleted = 0
ORDER BY lc.status DESC, lc.location_name
''', [session_id])
# Get active counters
active_counters = query_db('''
SELECT
u.full_name,
u.user_id,
MAX(lc.start_timestamp) AS start_timestamp, -- Add the alias here!
lc.location_name
FROM LocationCounts lc
JOIN Users u ON lc.counted_by = u.user_id
WHERE lc.session_id = ?
AND lc.status = 'in_progress'
AND lc.is_deleted = 0
GROUP BY u.user_id
ORDER BY start_timestamp DESC
''', [session_id])
return render_template('invcount/session_detail.html',
count_session=sess,
stats=stats,
locations=locations,
active_counters=active_counters)
@bp.route('/session/<int:session_id>/status-details/<status>')
@role_required('owner', 'admin')
def get_status_details(session_id, status):
"""Get detailed breakdown for a specific status"""
try:
if status == 'match':
# Matched lots (not duplicates) - JOIN with CURRENT for live data
items = query_db('''
SELECT
se.*,
u.full_name as scanned_by_name,
bic.system_bin as current_system_location,
bic.system_quantity as current_system_weight
FROM ScanEntries se
JOIN Users u ON se.scanned_by = u.user_id
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
WHERE se.session_id = ?
AND se.master_status = 'match'
AND se.duplicate_status = '00'
AND se.master_variance_lbs = 0
AND se.is_deleted = 0
ORDER BY se.scan_timestamp DESC
''', [session_id])
elif status == 'duplicates':
# Duplicate lots (grouped by lot number) - JOIN with CURRENT
items = query_db('''
SELECT
se.lot_number,
se.item,
se.description,
GROUP_CONCAT(DISTINCT se.scanned_location) as scanned_location,
SUM(se.actual_weight) as actual_weight,
se.master_expected_location,
se.master_expected_weight,
GROUP_CONCAT(DISTINCT u.full_name) as scanned_by_name,
MIN(se.scan_timestamp) as scan_timestamp,
bic.system_bin as current_system_location,
bic.system_quantity as current_system_weight
FROM ScanEntries se
JOIN Users u ON se.scanned_by = u.user_id
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
WHERE se.session_id = ?
AND se.duplicate_status IN ('01', '03', '04')
AND se.is_deleted = 0
GROUP BY se.lot_number
ORDER BY se.lot_number
''', [session_id])
elif status == 'wrong_location':
# Wrong location lots - JOIN with CURRENT
items = query_db('''
SELECT
se.*,
u.full_name as scanned_by_name,
bic.system_bin as current_system_location,
bic.system_quantity as current_system_weight
FROM ScanEntries se
JOIN Users u ON se.scanned_by = u.user_id
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
WHERE se.session_id = ?
AND se.master_status = 'wrong_location'
AND se.is_deleted = 0
ORDER BY se.scan_timestamp DESC
''', [session_id])
elif status == 'weight_discrepancy':
# Weight discrepancies (right location, wrong weight) - JOIN with CURRENT
items = query_db('''
SELECT
se.*,
u.full_name as scanned_by_name,
bic.system_bin as current_system_location,
bic.system_quantity as current_system_weight
FROM ScanEntries se
JOIN Users u ON se.scanned_by = u.user_id
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
WHERE se.session_id = ?
AND se.master_status = 'match'
AND se.duplicate_status = '00'
AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01
AND se.is_deleted = 0
ORDER BY ABS(se.actual_weight - se.master_expected_weight) DESC
''', [session_id])
elif status == 'ghost_lot':
# Ghost lots (not in master baseline) - JOIN with CURRENT
items = query_db('''
SELECT
se.*,
u.full_name as scanned_by_name,
bic.system_bin as current_system_location,
bic.system_quantity as current_system_weight
FROM ScanEntries se
JOIN Users u ON se.scanned_by = u.user_id
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
WHERE se.session_id = ?
AND se.master_status = 'ghost_lot'
AND se.is_deleted = 0
ORDER BY se.scan_timestamp DESC
''', [session_id])
elif status == 'missing':
# Missing lots (in master but not scanned)
items = query_db('''
SELECT
ml.lot_number,
ml.item,
bim.description,
ml.master_expected_location as system_bin,
ml.master_expected_quantity as system_quantity
FROM MissingLots ml
LEFT JOIN BaselineInventory_Master bim ON
ml.lot_number = bim.lot_number AND
ml.item = bim.item AND
ml.master_expected_location = bim.system_bin AND
ml.session_id = bim.session_id
WHERE ml.session_id = ?
GROUP BY ml.lot_number, ml.item, ml.master_expected_location
ORDER BY ml.master_expected_location, ml.lot_number
''', [session_id])
else:
return jsonify({'success': False, 'message': 'Invalid status'})
return jsonify({
'success': True,
'items': [dict(item) for item in items] if items else []
})
except Exception as e:
print(f"Error in get_status_details: {str(e)}")
return jsonify({'success': False, 'message': f'Error: {str(e)}'})
@bp.route('/session/<int:session_id>/archive', methods=['POST'])
@role_required('owner', 'admin')
def archive_session(session_id):
"""Archive a count session"""
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found'})
if sess['status'] == 'archived':
return jsonify({'success': False, 'message': 'Session is already archived'})
execute_db('UPDATE CountSessions SET status = ? WHERE session_id = ?', ['archived', session_id])
return jsonify({'success': True, 'message': 'Session archived successfully'})
@bp.route('/session/<int:session_id>/activate', methods=['POST'])
@role_required('owner', 'admin')
def activate_session(session_id):
"""Reactivate an archived session"""
sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found'})
if sess['status'] != 'archived':
return jsonify({'success': False, 'message': 'Session is not archived'})
execute_db('UPDATE CountSessions SET status = ? WHERE session_id = ?', ['active', session_id])
return jsonify({'success': True, 'message': 'Session activated successfully'})
@bp.route('/session/<int:session_id>/get_stats')
@role_required('owner', 'admin')
def get_session_stats(session_id):
stats = query_db('''
SELECT
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.master_variance_lbs = 0 AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) < 0.01) as matched,
COUNT(DISTINCT se.lot_number) FILTER (WHERE se.duplicate_status IN ('01', '03', '04') AND se.is_deleted = 0) as duplicates,
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01) as discrepancy,
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'wrong_location' AND se.is_deleted = 0) as wrong_location,
COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'ghost_lot' AND se.is_deleted = 0) as ghost_lots,
COUNT(DISTINCT ml.missing_id) as missing
FROM CountSessions cs
LEFT JOIN ScanEntries se ON cs.session_id = se.session_id
LEFT JOIN MissingLots ml ON cs.session_id = ml.session_id
WHERE cs.session_id = ?
''', [session_id], one=True)
return jsonify(success=True, stats=dict(stats))
@bp.route('/session/<int:session_id>/active-counters-fragment')
@role_required('owner', 'admin')
def active_counters_fragment(session_id):
"""Get currently active counters based on recent scan activity"""
active_counters = query_db('''
SELECT
u.full_name,
lc.location_name,
MAX(se.scan_timestamp) AS start_timestamp
FROM ScanEntries se
JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id
JOIN Users u ON se.scanned_by = u.user_id
WHERE lc.session_id = ?
AND lc.status = 'in_progress'
AND lc.is_deleted = 0
AND se.scan_timestamp >= datetime('now', '-15 minutes')
GROUP BY u.user_id, lc.location_name
ORDER BY MAX(se.scan_timestamp) DESC
''', [session_id])
return render_template('invcount/partials/_active_counters.html', active_counters=active_counters)
# =========================================================================
# ADMIN LOCATION ROUTES (from admin_locations.py)
# =========================================================================
@bp.route('/location/<int:location_count_id>/reopen', methods=['POST'])
@login_required
def reopen_location(location_count_id):
"""Reopen a completed location (admin/owner only)"""
# Check permissions
user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True)
if not user or user['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'}), 403
# Verify location exists
loc = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True)
if not loc:
return jsonify({'success': False, 'message': 'Location not found'})
# Reopen the location
execute_db('''
UPDATE LocationCounts
SET status = 'in_progress', end_timestamp = NULL
WHERE location_count_id = ?
''', [location_count_id])
return jsonify({'success': True, 'message': 'Bin reopened for counting'})
@bp.route('/location/<int:location_count_id>/scans')
@login_required
def get_location_scans(location_count_id):
"""Get all scans for a specific location (admin/owner only)"""
# Check permissions
user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True)
if not user or user['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'}), 403
try:
scans = query_db('''
SELECT
se.*,
bic.system_bin as current_system_location,
bic.system_quantity as current_system_weight
FROM ScanEntries se
LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number
WHERE se.location_count_id = ?
AND se.is_deleted = 0
ORDER BY se.scan_timestamp DESC
''', [location_count_id])
# Convert Row objects to dicts
scans_list = [dict(scan) for scan in scans] if scans else []
return jsonify({'success': True, 'scans': scans_list})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@bp.route('/location/<int:location_count_id>/delete', methods=['POST'])
@login_required
def soft_delete_location(location_count_id):
"""Admin-only: Soft delete a bin count and its associated data"""
if session.get('role') not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Admin role required'}), 403
# 1. Verify location exists
loc = query_db('SELECT session_id, location_name FROM LocationCounts WHERE location_count_id = ?',
[location_count_id], one=True)
if not loc:
return jsonify({'success': False, 'message': 'Location not found'})
# 2. Soft delete the bin count itself
execute_db('''
UPDATE LocationCounts
SET is_deleted = 1
WHERE location_count_id = ?
''', [location_count_id])
# 3. Soft delete all scans in that bin
execute_db('''
UPDATE ScanEntries
SET is_deleted = 1
WHERE location_count_id = ?
''', [location_count_id])
# 4. Remove any MissingLots records generated for this bin
execute_db('''
DELETE FROM MissingLots
WHERE session_id = ? AND master_expected_location = ?
''', [loc['session_id'], loc['location_name']])
return jsonify({'success': True, 'message': 'Bin count and associated data soft-deleted'})
# --- DATA IMPORT ROUTES ---
@bp.route('/upload_current/<int:session_id>', methods=['POST'])
@login_required
def upload_current(session_id):
"""Upload current inventory CSV (global baseline)"""
if 'csv_file' not in request.files:
flash('No file part', 'danger')
return redirect(url_for('invcount.session_detail', session_id=session_id))
file = request.files['csv_file']
if file.filename == '':
flash('No selected file', 'danger')
return redirect(url_for('invcount.session_detail', session_id=session_id))
if file:
conn = get_db()
cursor = conn.cursor()
try:
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
csv_input = csv.DictReader(stream)
# 1. Reset Table
cursor.execute('DROP TABLE IF EXISTS BaselineInventory_Current')
cursor.execute('''
CREATE TABLE BaselineInventory_Current (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item TEXT,
lot_number TEXT,
system_bin TEXT,
system_quantity REAL,
uom TEXT
)
''')
# 2. BULK INSERT with Correct Headers
rows_to_insert = []
for row in csv_input:
# Clean up keys (remove hidden characters/spaces)
row = {k.strip(): v for k, v in row.items()}
rows_to_insert.append((
row.get('Item', ''),
row.get('Lot Number', ''),
row.get('Bin Number', ''),
row.get('On Hand', 0),
row.get('UOM', 'LBS')
))
cursor.executemany('''
INSERT INTO BaselineInventory_Current
(item, lot_number, system_bin, system_quantity, uom)
VALUES (?, ?, ?, ?, ?)
''', rows_to_insert)
# 3. Update timestamp
cursor.execute('UPDATE CountSessions SET current_baseline_timestamp = CURRENT_TIMESTAMP')
conn.commit()
flash(f'Successfully uploaded {len(rows_to_insert)} records.', 'success')
except Exception as e:
conn.rollback()
flash(f'Error uploading CSV: {str(e)}', 'danger')
finally:
conn.close()
return redirect(url_for('invcount.session_detail', session_id=session_id))
@bp.route('/invcount/session/<int:session_id>/upload_master', methods=['POST'])
@login_required
def upload_master(session_id):
"""Upload master baseline CSV (session-specific)"""
if 'csv_file' not in request.files:
flash('No file uploaded', 'danger')
return redirect(url_for('invcount.session_detail', session_id=session_id))
file = request.files['csv_file']
if file.filename == '':
flash('No file selected', 'danger')
return redirect(url_for('invcount.session_detail', session_id=session_id))
conn = get_db()
cursor = conn.cursor()
cursor.execute('DELETE FROM BaselineInventory_Master WHERE session_id = ?', [session_id])
try:
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
csv_reader = csv.DictReader(stream)
lot_location_data = {}
# Consolidate duplicates in memory
for row in csv_reader:
# Clean keys here too just in case
row = {k.strip(): v for k, v in row.items()}
lot_num = row.get('Lot Number', '').strip()
bin_num = row.get('Bin Number', '').strip()
key = (lot_num, bin_num)
qty = float(row.get('On Hand', 0))
if key in lot_location_data:
lot_location_data[key]['quantity'] += qty
else:
lot_location_data[key] = {
'item': row.get('Item', '').strip(),
'description': row.get('Description', '').strip(),
'location': row.get('Location', '').strip(),
'bin': bin_num,
'quantity': qty
}
# BULK INSERT
rows_to_insert = []
for (lot_num, bin_num), data in lot_location_data.items():
rows_to_insert.append((
session_id,
lot_num,
data['item'],
data['description'],
data['location'],
data['bin'],
data['quantity']
))
cursor.executemany('''
INSERT INTO BaselineInventory_Master
(session_id, lot_number, item, description, system_location, system_bin, system_quantity)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', rows_to_insert)
cursor.execute('UPDATE CountSessions SET master_baseline_timestamp = CURRENT_TIMESTAMP WHERE session_id = ?', [session_id])
conn.commit()
flash(f'✅ MASTER baseline uploaded: {len(rows_to_insert)} records', 'success')
except Exception as e:
conn.rollback()
flash(f'Error uploading Master CSV: {str(e)}', 'danger')
finally:
conn.close()
return redirect(url_for('invcount.session_detail', session_id=session_id))