""" Inventory Counts Module - Routes Consolidated from counting.py, sessions.py, and admin_locations.py """ import csv import io from flask import render_template, request, redirect, url_for, flash, jsonify, session, current_app from db import query_db, execute_db, get_db from utils import login_required, role_required def register_routes(bp): """Register all invcount routes on the blueprint""" # ========================================================================= # HELPER FUNCTIONS # ========================================================================= def get_active_session(session_id): """Get session if it exists and is not archived. Returns None if invalid.""" sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True) if not sess or sess['status'] == 'archived': return None return sess # ========================================================================= # COUNTING ROUTES (from counting.py) # ========================================================================= @bp.route('/admin') @login_required def admin_dashboard(): """Admin dashboard for Counts module""" # Security check: Ensure user is admin/owner if session.get('role') not in ['owner', 'admin']: flash('Access denied. Admin role required.', 'danger') return redirect(url_for('invcount.index')) show_archived = request.args.get('show_archived', '0') == '1' if show_archived: sessions_list = query_db(''' SELECT s.*, u.full_name as created_by_name, COUNT(DISTINCT lc.location_count_id) as total_locations, SUM(CASE WHEN lc.status = 'completed' THEN 1 ELSE 0 END) as completed_locations, SUM(CASE WHEN lc.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_locations FROM CountSessions s LEFT JOIN Users u ON s.created_by = u.user_id LEFT JOIN LocationCounts lc ON s.session_id = lc.session_id WHERE s.status IN ('active', 'archived') GROUP BY s.session_id ORDER BY s.status ASC, s.created_timestamp DESC ''') else: sessions_list = query_db(''' SELECT s.*, u.full_name as created_by_name, COUNT(DISTINCT lc.location_count_id) as total_locations, SUM(CASE WHEN lc.status = 'completed' THEN 1 ELSE 0 END) as completed_locations, SUM(CASE WHEN lc.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_locations FROM CountSessions s LEFT JOIN Users u ON s.created_by = u.user_id LEFT JOIN LocationCounts lc ON s.session_id = lc.session_id WHERE s.status = 'active' GROUP BY s.session_id ORDER BY s.created_timestamp DESC ''') return render_template('invcount/admin_dashboard.html', sessions=sessions_list, show_archived=show_archived) @bp.route('/') @login_required def index(): """Counts module landing - show active sessions""" user_id = session.get('user_id') has_access = query_db(''' SELECT 1 FROM UserModules um JOIN Modules m ON um.module_id = m.module_id WHERE um.user_id = ? AND m.module_key = 'invcount' AND m.is_active = 1 ''', [user_id], one=True) if not has_access: flash('You do not have access to this module', 'danger') return redirect(url_for('home')) active_sessions = query_db(''' SELECT session_id, session_name, session_type, created_timestamp FROM CountSessions WHERE status = 'active' ORDER BY created_timestamp DESC ''') return render_template('invcount/staff_dashboard.html', sessions=active_sessions) @bp.route('/count/') @login_required def count_session(session_id): """Select session and begin counting""" sess = query_db('SELECT * FROM CountSessions WHERE session_id = ? AND status = "active"', [session_id], one=True) if not sess: flash('Session not found or not active', 'danger') return redirect(url_for('invcount.index')) # Redirect to my_counts page (staff can manage multiple bins) return redirect(url_for('invcount.my_counts', session_id=session_id)) @bp.route('/session//my-counts') @login_required def my_counts(session_id): """Staff view of their active and completed bins""" sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True) if not sess: flash('Session not found', 'danger') return redirect(url_for('invcount.index')) if sess['status'] == 'archived': flash('This session has been archived', 'warning') return redirect(url_for('invcount.index')) # Get this user's active bins active_bins = query_db(''' SELECT lc.*, COUNT(se.entry_id) as scan_count FROM LocationCounts lc LEFT JOIN ScanEntries se ON lc.location_count_id = se.location_count_id AND se.is_deleted = 0 WHERE lc.session_id = ? AND lc.status = 'in_progress' AND lc.is_deleted = 0 AND ( lc.counted_by = ? OR lc.location_count_id IN ( SELECT location_count_id FROM ScanEntries WHERE scanned_by = ? AND is_deleted = 0 ) ) GROUP BY lc.location_count_id ORDER BY lc.start_timestamp DESC ''', [session_id, session['user_id'], session['user_id']]) # Get this user's completed bins completed_bins = query_db(''' SELECT lc.*, COUNT(se.entry_id) as scan_count FROM LocationCounts lc LEFT JOIN ScanEntries se ON lc.location_count_id = se.location_count_id AND se.is_deleted = 0 WHERE lc.session_id = ? AND lc.status = 'completed' AND ( lc.counted_by = ? OR lc.location_count_id IN ( SELECT location_count_id FROM ScanEntries WHERE scanned_by = ? AND is_deleted = 0 ) ) GROUP BY lc.location_count_id ORDER BY lc.start_timestamp DESC ''', [session_id, session['user_id'], session['user_id']]) return render_template('/invcount/my_counts.html', count_session=sess, active_bins=active_bins, completed_bins=completed_bins) @bp.route('/session//start-bin', methods=['POST']) @login_required def start_bin_count(session_id): """Start counting a new bin or resume an existing in-progress one""" sess = get_active_session(session_id) if not sess: flash('Session not found or archived', 'warning') return redirect(url_for('invcount.index')) if not sess['master_baseline_timestamp']: flash('Master File not uploaded. Please upload it before starting bins.', 'warning') return redirect(url_for('invcount.my_counts', session_id=session_id)) location_name = request.form.get('location_name', '').strip().upper() if not location_name: flash('Bin number is required', 'danger') return redirect(url_for('invcount.my_counts', session_id=session_id)) # --- NEW LOGIC: Check for existing in-progress bin --- existing_bin = query_db(''' SELECT location_count_id FROM LocationCounts WHERE session_id = ? AND location_name = ? AND status = 'in_progress' ''', [session_id, location_name], one=True) if existing_bin: flash(f'Resuming bin: {location_name}', 'info') return redirect(url_for('invcount.count_location', session_id=session_id, location_count_id=existing_bin['location_count_id'])) # --- END NEW LOGIC --- # Count expected lots from MASTER baseline for this location expected_lots = query_db(''' SELECT COUNT(DISTINCT lot_number) as count FROM BaselineInventory_Master WHERE session_id = ? AND system_bin = ? ''', [session_id, location_name], one=True) expected_count = expected_lots['count'] if expected_lots else 0 # Create new location count if none existed conn = get_db() cursor = conn.cursor() cursor.execute(''' INSERT INTO LocationCounts (session_id, location_name, counted_by, status, start_timestamp, expected_lots_master) VALUES (?, ?, ?, 'in_progress', CURRENT_TIMESTAMP, ?) ''', [session_id, location_name, session['user_id'], expected_count]) location_count_id = cursor.lastrowid conn.commit() conn.close() flash(f'Started counting bin: {location_name}', 'success') return redirect(url_for('invcount.count_location', session_id=session_id, location_count_id=location_count_id)) @bp.route('/location//complete', methods=['POST']) @login_required def complete_location(location_count_id): """Mark a location count as complete (Simple toggle)""" # Verify ownership loc = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True) if not loc: return jsonify({'success': False, 'message': 'Location not found'}) if loc['counted_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']: return jsonify({'success': False, 'message': 'Permission denied'}) # Mark as completed execute_db(''' UPDATE LocationCounts SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP WHERE location_count_id = ? ''', [location_count_id]) return jsonify({'success': True, 'message': 'Bin marked as complete'}) @bp.route('/count//location/') @login_required def count_location(session_id, location_count_id): """Count lots in a specific location""" # Get session info to determine type (Cycle Count vs Physical) sess = get_active_session(session_id) if not sess: flash('Session not found or archived', 'warning') return redirect(url_for('invcount.index')) if not sess['master_baseline_timestamp']: flash('Master File not uploaded. Please upload it before starting bins.', 'warning') return redirect(url_for('invcount.my_counts', session_id=session_id)) location = query_db(''' SELECT * FROM LocationCounts WHERE location_count_id = ? AND session_id = ? ''', [location_count_id, session_id], one=True) if not location: flash('Location not found', 'danger') return redirect(url_for('invcount.count_session', session_id=session_id)) # Check if location is completed and user is staff (not admin/owner) if location['status'] == 'completed' and session['role'] == 'staff': flash(f'Location {location["location_name"]} has been finalized and cannot accept new scans', 'warning') return redirect(url_for('invcount.my_counts', session_id=session_id)) # Get scans for this location (Scanned Lots) scans = query_db(''' SELECT * FROM ScanEntries WHERE location_count_id = ? AND is_deleted = 0 ORDER BY scan_timestamp DESC ''', [location_count_id]) # NEW LOGIC: Get Expected Lots for Cycle Counts (Grouped & Summed) expected_lots = [] if sess and sess['session_type'] == 'cycle_count': expected_lots = query_db(''' SELECT lot_number, MAX(item) as item, -- Pick one item code if they differ (rare) SUM(system_quantity) as total_weight FROM BaselineInventory_Master WHERE session_id = ? AND system_bin = ? AND lot_number NOT IN ( SELECT lot_number FROM ScanEntries WHERE location_count_id = ? AND is_deleted = 0 ) GROUP BY lot_number ORDER BY lot_number ''', [session_id, location['location_name'], location_count_id]) return render_template('invcount/count_location.html', session_id=session_id, location=location, scans=scans, expected_lots=expected_lots, session_type=sess['session_type'] if sess else '') @bp.route('/count//location//scan', methods=['POST']) @login_required def scan_lot(session_id, location_count_id): """Process a lot scan with duplicate detection""" sess = get_active_session(session_id) if not sess: return jsonify({'success': False, 'message': 'Session not found or archived'}) data = request.get_json() lot_number = data.get('lot_number', '').strip() weight = data.get('weight') confirm_duplicate = data.get('confirm_duplicate', False) check_only = data.get('check_only', False) # Just checking for duplicates, not saving if not lot_number: return jsonify({'success': False, 'message': 'Lot number required'}) if not check_only and not weight: return jsonify({'success': False, 'message': 'Weight required'}) if not check_only: try: weight = float(weight) except ValueError: return jsonify({'success': False, 'message': 'Invalid weight value'}) # Get location info location = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True) # Check for duplicates in this session existing_scans = query_db(''' SELECT se.*, lc.location_name, u.full_name FROM ScanEntries se JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id JOIN Users u ON se.scanned_by = u.user_id WHERE se.session_id = ? AND se.lot_number = ? AND se.is_deleted = 0 ''', [session_id, lot_number]) duplicate_status = '00' # Default: no duplicate duplicate_info = None needs_confirmation = False if existing_scans: # Check for same location duplicates (by this user) same_location = [s for s in existing_scans if s['location_name'] == location['location_name'] and s['scanned_by'] == session['user_id']] # Check for different location duplicates (by anyone) diff_location = [s for s in existing_scans if s['location_name'] != location['location_name']] if same_location and diff_location: # Status 04: Duplicate in both same and different locations duplicate_status = '04' other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location)) duplicate_info = f"Also found in {other_locs}. Duplicate Lot" needs_confirmation = True elif same_location: # Status 01: Duplicate in same location only duplicate_status = '01' duplicate_info = "Duplicate" needs_confirmation = True elif diff_location: # Status 03: Duplicate in different location only duplicate_status = '03' other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location)) duplicate_info = f"Also found in {other_locs}" # If just checking, return early with baseline info if check_only: # Get baseline info to show what they're scanning master_info = query_db(''' SELECT item, description FROM BaselineInventory_Master WHERE session_id = ? AND lot_number = ? LIMIT 1 ''', [session_id, lot_number], one=True) if needs_confirmation: return jsonify({ 'success': False, 'needs_confirmation': True, 'message': 'Lot already scanned, Are you sure?', 'duplicate_status': duplicate_status, 'item': master_info['item'] if master_info else None, 'description': master_info['description'] if master_info else None }) else: return jsonify({ 'success': True, 'needs_confirmation': False, 'item': master_info['item'] if master_info else None, 'description': master_info['description'] if master_info else None }) # If needs confirmation and not yet confirmed, ask user if needs_confirmation and not confirm_duplicate: return jsonify({ 'success': False, 'needs_confirmation': True, 'message': 'Lot already scanned, Are you sure?', 'duplicate_status': duplicate_status }) # Check against MASTER baseline master = query_db(''' SELECT system_bin, SUM(system_quantity) as system_quantity, MAX(item) as item, MAX(description) as description FROM BaselineInventory_Master WHERE session_id = ? AND lot_number = ? AND system_bin = ? GROUP BY system_bin ''', [session_id, lot_number, location['location_name']], one=True) # Determine master_status (only if not a duplicate issue) if duplicate_status == '00': if master: # Lot exists in correct location master_status = 'match' if master['system_quantity'] is not None: variance_lbs = weight - master['system_quantity'] variance_pct = (variance_lbs / master['system_quantity'] * 100) if master['system_quantity'] > 0 else 0 else: variance_lbs = None variance_pct = None else: # Check if lot exists in different location master_other = query_db(''' SELECT system_bin, SUM(system_quantity) as system_quantity, MAX(item) as item, MAX(description) as description FROM BaselineInventory_Master WHERE session_id = ? AND lot_number = ? GROUP BY system_bin ORDER BY system_bin LIMIT 1 ''', [session_id, lot_number], one=True) if master_other: master_status = 'wrong_location' master = master_other variance_lbs = None variance_pct = None else: # Ghost lot master_status = 'ghost_lot' variance_lbs = None variance_pct = None else: # For duplicates, still check baseline for item info if not master: master = query_db(''' SELECT system_bin, SUM(system_quantity) as system_quantity, MAX(item) as item, MAX(description) as description FROM BaselineInventory_Master WHERE session_id = ? AND lot_number = ? GROUP BY system_bin ORDER BY system_bin LIMIT 1 ''', [session_id, lot_number], one=True) master_status = 'match' # Don't override with wrong_location for duplicates variance_lbs = None variance_pct = None # Insert scan entry_id = execute_db(''' INSERT INTO ScanEntries (session_id, location_count_id, lot_number, item, description, scanned_location, actual_weight, scanned_by, master_status, master_expected_location, master_expected_weight, master_variance_lbs, master_variance_pct, duplicate_status, duplicate_info, comment) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', [ session_id, location_count_id, lot_number, master['item'] if master else None, master['description'] if master else None, location['location_name'], weight, session['user_id'], master_status, master['system_bin'] if master else None, master['system_quantity'] if master else None, variance_lbs, variance_pct, duplicate_status, duplicate_info, duplicate_info ]) # If this is a confirmed duplicate (01 or 04), update previous scans in same location updated_entry_ids = [] if duplicate_status in ['01', '04'] and confirm_duplicate: same_location_ids = [s['entry_id'] for s in existing_scans if s['location_name'] == location['location_name'] and s['scanned_by'] == session['user_id']] for scan_id in same_location_ids: execute_db(''' UPDATE ScanEntries SET duplicate_status = ?, duplicate_info = ?, comment = ?, modified_timestamp = CURRENT_TIMESTAMP WHERE entry_id = ? ''', [duplicate_status, duplicate_info, duplicate_info, scan_id]) updated_entry_ids.append(scan_id) # Update location count execute_db(''' UPDATE LocationCounts SET lots_found = lots_found + 1 WHERE location_count_id = ? ''', [location_count_id]) return jsonify({ 'success': True, 'entry_id': entry_id, 'master_status': master_status, 'duplicate_status': duplicate_status, 'duplicate_info': duplicate_info, 'master_expected_location': master['system_bin'] if master else None, 'master_expected_weight': master['system_quantity'] if master else None, 'actual_weight': weight, 'variance_lbs': variance_lbs, 'item': master['item'] if master else 'Unknown Item', 'description': master['description'] if master else 'Not in system', 'updated_entry_ids': updated_entry_ids # IDs of scans that were updated to duplicate }) @bp.route('/scan//delete', methods=['POST']) @login_required def delete_scan(entry_id): """Soft delete a scan and recalculate duplicate statuses""" # Get the scan being deleted scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ? AND is_deleted = 0', [entry_id], one=True) if not scan: return jsonify({'success': False, 'message': 'Scan not found'}) # Only allow user to delete their own scans if scan['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']: return jsonify({'success': False, 'message': 'Permission denied'}) # Soft delete the scan execute_db(''' UPDATE ScanEntries SET is_deleted = 1, deleted_by = ?, deleted_timestamp = CURRENT_TIMESTAMP WHERE entry_id = ? ''', [session['user_id'], entry_id]) # Recalculate duplicate statuses for this lot number in this session updated_entries = recalculate_duplicate_status(scan['session_id'], scan['lot_number'], scan['scanned_location']) # Update location count execute_db(''' UPDATE LocationCounts SET lots_found = lots_found - 1 WHERE location_count_id = ? ''', [scan['location_count_id']]) return jsonify({ 'success': True, 'message': 'Scan deleted', 'updated_entries': updated_entries # Return which scans were updated }) @bp.route('/scan//details', methods=['GET']) @login_required def get_scan_details(entry_id): """Get detailed information about a scan""" scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ? AND is_deleted = 0', [entry_id], one=True) if not scan: return jsonify({'success': False, 'message': 'Scan not found'}) return jsonify({ 'success': True, 'scan': dict(scan) }) @bp.route('/scan//update', methods=['POST']) @login_required def update_scan(entry_id): """Update scan item, weight and comment""" data = request.get_json() item = data.get('item', '').strip() weight = data.get('weight') comment = data.get('comment', '') # Get the scan scan = query_db('SELECT * FROM ScanEntries WHERE entry_id = ? AND is_deleted = 0', [entry_id], one=True) if not scan: return jsonify({'success': False, 'message': 'Scan not found'}) # Only allow user to update their own scans if scan['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']: return jsonify({'success': False, 'message': 'Permission denied'}) try: weight = float(weight) except ValueError: return jsonify({'success': False, 'message': 'Invalid weight value'}) # Update the scan execute_db(''' UPDATE ScanEntries SET item = ?, actual_weight = ?, comment = ?, modified_timestamp = CURRENT_TIMESTAMP WHERE entry_id = ? and is_deleted = 0 ''', [item, weight, comment, entry_id]) return jsonify({'success': True, 'message': 'Scan updated'}) def recalculate_duplicate_status(session_id, lot_number, current_location): """Recalculate duplicate statuses for a lot after deletion""" # Track which entries were updated updated_entries = [] # Get all active scans for this lot in this session scans = query_db(''' SELECT se.*, lc.location_name, u.full_name, u.user_id as scan_user_id FROM ScanEntries se JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id JOIN Users u ON se.scanned_by = u.user_id WHERE se.session_id = ? AND se.lot_number = ? AND se.is_deleted = 0 ORDER BY se.scan_timestamp ''', [session_id, lot_number]) if not scans: return updated_entries # Reset all to status 00 for scan in scans: execute_db(''' UPDATE ScanEntries SET duplicate_status = '00', duplicate_info = NULL, comment = NULL, modified_timestamp = CURRENT_TIMESTAMP WHERE entry_id = ? and is_deleted = 0 ''', [scan['entry_id']]) updated_entries.append({ 'entry_id': scan['entry_id'], 'duplicate_status': '00', 'duplicate_info': None }) # Recalculate statuses for i, scan in enumerate(scans): # Get previous scans (before this one chronologically) prev_scans = scans[:i] if not prev_scans: continue # First scan, stays 00 same_location = [s for s in prev_scans if s['location_name'] == scan['location_name'] and s['scan_user_id'] == scan['scan_user_id']] diff_location = [s for s in prev_scans if s['location_name'] != scan['location_name']] duplicate_status = '00' duplicate_info = None if same_location and diff_location: # Status 04 duplicate_status = '04' other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location)) duplicate_info = f"Also found in {other_locs}. Duplicate Lot" elif same_location: # Status 01 duplicate_status = '01' duplicate_info = "Duplicate" elif diff_location: # Status 03 duplicate_status = '03' other_locs = ', '.join(set(f"{s['location_name']} by {s['full_name']}" for s in diff_location)) duplicate_info = f"Also found in {other_locs}" # Update this scan if it changed from 00 if duplicate_status != '00': execute_db(''' UPDATE ScanEntries SET duplicate_status = ?, duplicate_info = ?, comment = ?, modified_timestamp = CURRENT_TIMESTAMP WHERE entry_id = ? and is_deleted = 0 ''', [duplicate_status, duplicate_info, duplicate_info, scan['entry_id']]) # Update our tracking list for entry in updated_entries: if entry['entry_id'] == scan['entry_id']: entry['duplicate_status'] = duplicate_status entry['duplicate_info'] = duplicate_info break # If status 01 or 04, also update previous scans in same location if duplicate_status in ['01', '04'] and same_location: for prev_scan in same_location: execute_db(''' UPDATE ScanEntries SET duplicate_status = ?, duplicate_info = ?, comment = ?, modified_timestamp = CURRENT_TIMESTAMP WHERE entry_id = ? and is_deleted = 0 ''', [duplicate_status, duplicate_info, duplicate_info, prev_scan['entry_id']]) # Update tracking for previous scans for entry in updated_entries: if entry['entry_id'] == prev_scan['entry_id']: entry['duplicate_status'] = duplicate_status entry['duplicate_info'] = duplicate_info break return updated_entries @bp.route('/count//location//finish', methods=['POST']) @login_required def finish_location(session_id, location_count_id): """Finish counting a location""" sess = get_active_session(session_id) if not sess: return jsonify({'success': False, 'message': 'Session not found or archived'}) # Get location info location = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True) if not location: return jsonify({'success': False, 'message': 'Location not found'}) # Mark location as completed execute_db(''' UPDATE LocationCounts SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP WHERE location_count_id = ? ''', [location_count_id]) # V1.0: Mark missing lots from MASTER baseline that weren't scanned # Get all expected lots for this location from MASTER baseline expected_lots = query_db(''' SELECT lot_number, item, description, system_quantity FROM BaselineInventory_Master WHERE session_id = ? AND system_bin = ? ''', [session_id, location['location_name']]) # Get all scanned lots for this location scanned_lots = query_db(''' SELECT DISTINCT lot_number FROM ScanEntries WHERE location_count_id = ? AND is_deleted = 0 ''', [location_count_id]) scanned_lot_numbers = {s['lot_number'] for s in scanned_lots} # Insert missing lots for expected in expected_lots: if expected['lot_number'] not in scanned_lot_numbers: execute_db(''' INSERT INTO MissingLots (session_id, lot_number, master_expected_location, item, master_expected_quantity, marked_by) VALUES (?, ?, ?, ?, ?, ?) ''', [session_id, expected['lot_number'], location['location_name'], expected['item'], expected['system_quantity'], session['user_id']]) flash('Location count completed!', 'success') return jsonify({ 'success': True, 'redirect': url_for('invcount.count_session', session_id=session_id) }) @bp.route('/session//finalize-all', methods=['POST']) @login_required def finalize_all_locations(session_id): """Finalize all 'in_progress' locations in a session""" if session.get('role') not in ['owner', 'admin']: return jsonify({'success': False, 'message': 'Permission denied'}), 403 # 1. Get all in_progress locations for this session locations = query_db(''' SELECT location_count_id, location_name FROM LocationCounts WHERE session_id = ? AND status = 'in_progress' AND is_deleted = 0 ''', [session_id]) if not locations: return jsonify({'success': True, 'message': 'No open bins to finalize.'}) # 2. Loop through and run the finalize logic for each for loc in locations: # We reuse the logic from your existing finish_location route execute_db(''' UPDATE LocationCounts SET status = 'completed', end_timestamp = CURRENT_TIMESTAMP WHERE location_count_id = ? ''', [loc['location_count_id']]) # Identify missing lots from MASTER baseline expected_lots = query_db(''' SELECT lot_number, item, description, system_quantity FROM BaselineInventory_Master WHERE session_id = ? AND system_bin = ? ''', [session_id, loc['location_name']]) scanned_lots = query_db(''' SELECT DISTINCT lot_number FROM ScanEntries WHERE location_count_id = ? AND is_deleted = 0 ''', [loc['location_count_id']]) scanned_lot_numbers = {s['lot_number'] for s in scanned_lots} for expected in expected_lots: if expected['lot_number'] not in scanned_lot_numbers: execute_db(''' INSERT INTO MissingLots (session_id, lot_number, master_expected_location, item, master_expected_quantity, marked_by) VALUES (?, ?, ?, ?, ?, ?) ''', [session_id, expected['lot_number'], loc['location_name'], expected['item'], expected['system_quantity'], session['user_id']]) return jsonify({'success': True, 'message': f'Successfully finalized {len(locations)} bins.'}) # ========================================================================= # SESSION MANAGEMENT ROUTES (from sessions.py) # ========================================================================= @bp.route('/session/create', methods=['GET', 'POST']) @role_required('owner', 'admin') def create_session(): """Create new count session""" if request.method == 'POST': session_name = request.form.get('session_name', '').strip() session_type = request.form.get('session_type') if not session_name: flash('Session name is required', 'danger') return redirect(url_for('invcount.create_session')) session_id = execute_db(''' INSERT INTO CountSessions (session_name, session_type, created_by, branch) VALUES (?, ?, ?, ?) ''', [session_name, session_type, session['user_id'], 'Main']) flash(f'Session "{session_name}" created successfully!', 'success') return redirect(url_for('invcount.session_detail', session_id=session_id)) return render_template('invcount/create_session.html') @bp.route('/session/') @role_required('owner', 'admin') def session_detail(session_id): """Session detail and monitoring page""" sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True) if not sess: flash('Session not found', 'danger') return redirect(url_for('dashboard')) # Get statistics stats = query_db(''' SELECT COUNT(DISTINCT se.entry_id) FILTER (WHERE se.is_deleted = 0) as total_scans, COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) < 0.01) as matched, COUNT(DISTINCT se.lot_number) FILTER (WHERE se.duplicate_status IN ('01', '03', '04') AND se.is_deleted = 0) as duplicates, COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01) as weight_discrepancy, COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'wrong_location' AND se.is_deleted = 0) as wrong_location, COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'ghost_lot' AND se.is_deleted = 0) as ghost_lots, COUNT(DISTINCT ml.missing_id) as missing_lots FROM CountSessions cs LEFT JOIN ScanEntries se ON cs.session_id = se.session_id LEFT JOIN MissingLots ml ON cs.session_id = ml.session_id WHERE cs.session_id = ? ''', [session_id], one=True) # Get location progress # We add a subquery to count the actual missing lots for each bin locations = query_db(''' SELECT lc.*, u.full_name as counter_name, (SELECT COUNT(*) FROM MissingLots ml WHERE ml.session_id = lc.session_id AND ml.master_expected_location = lc.location_name) as lots_missing_calc FROM LocationCounts lc LEFT JOIN Users u ON lc.counted_by = u.user_id WHERE lc.session_id = ? AND lc.is_deleted = 0 ORDER BY lc.status DESC, lc.location_name ''', [session_id]) # Get active counters active_counters = query_db(''' SELECT u.full_name, u.user_id, MAX(lc.start_timestamp) AS start_timestamp, -- Add the alias here! lc.location_name FROM LocationCounts lc JOIN Users u ON lc.counted_by = u.user_id WHERE lc.session_id = ? AND lc.status = 'in_progress' AND lc.is_deleted = 0 GROUP BY u.user_id ORDER BY start_timestamp DESC ''', [session_id]) return render_template('invcount/session_detail.html', count_session=sess, stats=stats, locations=locations, active_counters=active_counters) @bp.route('/session//status-details/') @role_required('owner', 'admin') def get_status_details(session_id, status): """Get detailed breakdown for a specific status""" try: if status == 'match': # Matched lots (not duplicates) - JOIN with CURRENT for live data items = query_db(''' SELECT se.*, u.full_name as scanned_by_name, bic.system_bin as current_system_location, bic.system_quantity as current_system_weight FROM ScanEntries se JOIN Users u ON se.scanned_by = u.user_id LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number WHERE se.session_id = ? AND se.master_status = 'match' AND se.duplicate_status = '00' AND se.master_variance_lbs = 0 AND se.is_deleted = 0 ORDER BY se.scan_timestamp DESC ''', [session_id]) elif status == 'duplicates': # Duplicate lots (grouped by lot number) - JOIN with CURRENT items = query_db(''' SELECT se.lot_number, se.item, se.description, GROUP_CONCAT(DISTINCT se.scanned_location) as scanned_location, SUM(se.actual_weight) as actual_weight, se.master_expected_location, se.master_expected_weight, GROUP_CONCAT(DISTINCT u.full_name) as scanned_by_name, MIN(se.scan_timestamp) as scan_timestamp, bic.system_bin as current_system_location, bic.system_quantity as current_system_weight FROM ScanEntries se JOIN Users u ON se.scanned_by = u.user_id LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number WHERE se.session_id = ? AND se.duplicate_status IN ('01', '03', '04') AND se.is_deleted = 0 GROUP BY se.lot_number ORDER BY se.lot_number ''', [session_id]) elif status == 'wrong_location': # Wrong location lots - JOIN with CURRENT items = query_db(''' SELECT se.*, u.full_name as scanned_by_name, bic.system_bin as current_system_location, bic.system_quantity as current_system_weight FROM ScanEntries se JOIN Users u ON se.scanned_by = u.user_id LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number WHERE se.session_id = ? AND se.master_status = 'wrong_location' AND se.is_deleted = 0 ORDER BY se.scan_timestamp DESC ''', [session_id]) elif status == 'weight_discrepancy': # Weight discrepancies (right location, wrong weight) - JOIN with CURRENT items = query_db(''' SELECT se.*, u.full_name as scanned_by_name, bic.system_bin as current_system_location, bic.system_quantity as current_system_weight FROM ScanEntries se JOIN Users u ON se.scanned_by = u.user_id LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number WHERE se.session_id = ? AND se.master_status = 'match' AND se.duplicate_status = '00' AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01 AND se.is_deleted = 0 ORDER BY ABS(se.actual_weight - se.master_expected_weight) DESC ''', [session_id]) elif status == 'ghost_lot': # Ghost lots (not in master baseline) - JOIN with CURRENT items = query_db(''' SELECT se.*, u.full_name as scanned_by_name, bic.system_bin as current_system_location, bic.system_quantity as current_system_weight FROM ScanEntries se JOIN Users u ON se.scanned_by = u.user_id LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number WHERE se.session_id = ? AND se.master_status = 'ghost_lot' AND se.is_deleted = 0 ORDER BY se.scan_timestamp DESC ''', [session_id]) elif status == 'missing': # Missing lots (in master but not scanned) items = query_db(''' SELECT ml.lot_number, ml.item, bim.description, ml.master_expected_location as system_bin, ml.master_expected_quantity as system_quantity FROM MissingLots ml LEFT JOIN BaselineInventory_Master bim ON ml.lot_number = bim.lot_number AND ml.item = bim.item AND ml.master_expected_location = bim.system_bin AND ml.session_id = bim.session_id WHERE ml.session_id = ? GROUP BY ml.lot_number, ml.item, ml.master_expected_location ORDER BY ml.master_expected_location, ml.lot_number ''', [session_id]) else: return jsonify({'success': False, 'message': 'Invalid status'}) return jsonify({ 'success': True, 'items': [dict(item) for item in items] if items else [] }) except Exception as e: print(f"Error in get_status_details: {str(e)}") return jsonify({'success': False, 'message': f'Error: {str(e)}'}) @bp.route('/session//archive', methods=['POST']) @role_required('owner', 'admin') def archive_session(session_id): """Archive a count session""" sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True) if not sess: return jsonify({'success': False, 'message': 'Session not found'}) if sess['status'] == 'archived': return jsonify({'success': False, 'message': 'Session is already archived'}) execute_db('UPDATE CountSessions SET status = ? WHERE session_id = ?', ['archived', session_id]) return jsonify({'success': True, 'message': 'Session archived successfully'}) @bp.route('/session//activate', methods=['POST']) @role_required('owner', 'admin') def activate_session(session_id): """Reactivate an archived session""" sess = query_db('SELECT * FROM CountSessions WHERE session_id = ?', [session_id], one=True) if not sess: return jsonify({'success': False, 'message': 'Session not found'}) if sess['status'] != 'archived': return jsonify({'success': False, 'message': 'Session is not archived'}) execute_db('UPDATE CountSessions SET status = ? WHERE session_id = ?', ['active', session_id]) return jsonify({'success': True, 'message': 'Session activated successfully'}) @bp.route('/session//get_stats') @role_required('owner', 'admin') def get_session_stats(session_id): stats = query_db(''' SELECT COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.master_variance_lbs = 0 AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) < 0.01) as matched, COUNT(DISTINCT se.lot_number) FILTER (WHERE se.duplicate_status IN ('01', '03', '04') AND se.is_deleted = 0) as duplicates, COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'match' AND se.duplicate_status = '00' AND se.is_deleted = 0 AND ABS(se.actual_weight - se.master_expected_weight) >= 0.01) as discrepancy, COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'wrong_location' AND se.is_deleted = 0) as wrong_location, COUNT(DISTINCT se.entry_id) FILTER (WHERE se.master_status = 'ghost_lot' AND se.is_deleted = 0) as ghost_lots, COUNT(DISTINCT ml.missing_id) as missing FROM CountSessions cs LEFT JOIN ScanEntries se ON cs.session_id = se.session_id LEFT JOIN MissingLots ml ON cs.session_id = ml.session_id WHERE cs.session_id = ? ''', [session_id], one=True) return jsonify(success=True, stats=dict(stats)) @bp.route('/session//active-counters-fragment') @role_required('owner', 'admin') def active_counters_fragment(session_id): """Get currently active counters based on recent scan activity""" active_counters = query_db(''' SELECT u.full_name, lc.location_name, MAX(se.scan_timestamp) AS start_timestamp FROM ScanEntries se JOIN LocationCounts lc ON se.location_count_id = lc.location_count_id JOIN Users u ON se.scanned_by = u.user_id WHERE lc.session_id = ? AND lc.status = 'in_progress' AND lc.is_deleted = 0 AND se.scan_timestamp >= datetime('now', '-15 minutes') GROUP BY u.user_id, lc.location_name ORDER BY MAX(se.scan_timestamp) DESC ''', [session_id]) return render_template('invcount/partials/_active_counters.html', active_counters=active_counters) # ========================================================================= # ADMIN LOCATION ROUTES (from admin_locations.py) # ========================================================================= @bp.route('/location//reopen', methods=['POST']) @login_required def reopen_location(location_count_id): """Reopen a completed location (admin/owner only)""" # Check permissions user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True) if not user or user['role'] not in ['owner', 'admin']: return jsonify({'success': False, 'message': 'Permission denied'}), 403 # Verify location exists loc = query_db('SELECT * FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True) if not loc: return jsonify({'success': False, 'message': 'Location not found'}) # Reopen the location execute_db(''' UPDATE LocationCounts SET status = 'in_progress', end_timestamp = NULL WHERE location_count_id = ? ''', [location_count_id]) return jsonify({'success': True, 'message': 'Bin reopened for counting'}) @bp.route('/location//scans') @login_required def get_location_scans(location_count_id): """Get all scans for a specific location (admin/owner only)""" # Check permissions user = query_db('SELECT role FROM Users WHERE user_id = ?', [session['user_id']], one=True) if not user or user['role'] not in ['owner', 'admin']: return jsonify({'success': False, 'message': 'Permission denied'}), 403 try: scans = query_db(''' SELECT se.*, bic.system_bin as current_system_location, bic.system_quantity as current_system_weight FROM ScanEntries se LEFT JOIN BaselineInventory_Current bic ON se.lot_number = bic.lot_number WHERE se.location_count_id = ? AND se.is_deleted = 0 ORDER BY se.scan_timestamp DESC ''', [location_count_id]) # Convert Row objects to dicts scans_list = [dict(scan) for scan in scans] if scans else [] return jsonify({'success': True, 'scans': scans_list}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @bp.route('/location//delete', methods=['POST']) @login_required def soft_delete_location(location_count_id): """Admin-only: Soft delete a bin count and its associated data""" if session.get('role') not in ['owner', 'admin']: return jsonify({'success': False, 'message': 'Admin role required'}), 403 # 1. Verify location exists loc = query_db('SELECT session_id, location_name FROM LocationCounts WHERE location_count_id = ?', [location_count_id], one=True) if not loc: return jsonify({'success': False, 'message': 'Location not found'}) # 2. Soft delete the bin count itself execute_db(''' UPDATE LocationCounts SET is_deleted = 1 WHERE location_count_id = ? ''', [location_count_id]) # 3. Soft delete all scans in that bin execute_db(''' UPDATE ScanEntries SET is_deleted = 1 WHERE location_count_id = ? ''', [location_count_id]) # 4. Remove any MissingLots records generated for this bin execute_db(''' DELETE FROM MissingLots WHERE session_id = ? AND master_expected_location = ? ''', [loc['session_id'], loc['location_name']]) return jsonify({'success': True, 'message': 'Bin count and associated data soft-deleted'}) # --- DATA IMPORT ROUTES --- @bp.route('/upload_current/', methods=['POST']) @login_required def upload_current(session_id): """Upload current inventory CSV (global baseline)""" if 'csv_file' not in request.files: flash('No file part', 'danger') return redirect(url_for('invcount.session_detail', session_id=session_id)) file = request.files['csv_file'] if file.filename == '': flash('No selected file', 'danger') return redirect(url_for('invcount.session_detail', session_id=session_id)) if file: conn = get_db() cursor = conn.cursor() try: stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) csv_input = csv.DictReader(stream) # 1. Reset Table cursor.execute('DROP TABLE IF EXISTS BaselineInventory_Current') cursor.execute(''' CREATE TABLE BaselineInventory_Current ( id INTEGER PRIMARY KEY AUTOINCREMENT, item TEXT, lot_number TEXT, system_bin TEXT, system_quantity REAL, uom TEXT ) ''') # 2. BULK INSERT with Correct Headers rows_to_insert = [] for row in csv_input: # Clean up keys (remove hidden characters/spaces) row = {k.strip(): v for k, v in row.items()} rows_to_insert.append(( row.get('Item', ''), row.get('Lot Number', ''), row.get('Bin Number', ''), row.get('On Hand', 0), row.get('UOM', 'LBS') )) cursor.executemany(''' INSERT INTO BaselineInventory_Current (item, lot_number, system_bin, system_quantity, uom) VALUES (?, ?, ?, ?, ?) ''', rows_to_insert) # 3. Update timestamp cursor.execute('UPDATE CountSessions SET current_baseline_timestamp = CURRENT_TIMESTAMP') conn.commit() flash(f'Successfully uploaded {len(rows_to_insert)} records.', 'success') except Exception as e: conn.rollback() flash(f'Error uploading CSV: {str(e)}', 'danger') finally: conn.close() return redirect(url_for('invcount.session_detail', session_id=session_id)) @bp.route('/invcount/session//upload_master', methods=['POST']) @login_required def upload_master(session_id): """Upload master baseline CSV (session-specific)""" if 'csv_file' not in request.files: flash('No file uploaded', 'danger') return redirect(url_for('invcount.session_detail', session_id=session_id)) file = request.files['csv_file'] if file.filename == '': flash('No file selected', 'danger') return redirect(url_for('invcount.session_detail', session_id=session_id)) conn = get_db() cursor = conn.cursor() cursor.execute('DELETE FROM BaselineInventory_Master WHERE session_id = ?', [session_id]) try: stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) csv_reader = csv.DictReader(stream) lot_location_data = {} # Consolidate duplicates in memory for row in csv_reader: # Clean keys here too just in case row = {k.strip(): v for k, v in row.items()} lot_num = row.get('Lot Number', '').strip() bin_num = row.get('Bin Number', '').strip() key = (lot_num, bin_num) qty = float(row.get('On Hand', 0)) if key in lot_location_data: lot_location_data[key]['quantity'] += qty else: lot_location_data[key] = { 'item': row.get('Item', '').strip(), 'description': row.get('Description', '').strip(), 'location': row.get('Location', '').strip(), 'bin': bin_num, 'quantity': qty } # BULK INSERT rows_to_insert = [] for (lot_num, bin_num), data in lot_location_data.items(): rows_to_insert.append(( session_id, lot_num, data['item'], data['description'], data['location'], data['bin'], data['quantity'] )) cursor.executemany(''' INSERT INTO BaselineInventory_Master (session_id, lot_number, item, description, system_location, system_bin, system_quantity) VALUES (?, ?, ?, ?, ?, ?, ?) ''', rows_to_insert) cursor.execute('UPDATE CountSessions SET master_baseline_timestamp = CURRENT_TIMESTAMP WHERE session_id = ?', [session_id]) conn.commit() flash(f'✅ MASTER baseline uploaded: {len(rows_to_insert)} records', 'success') except Exception as e: conn.rollback() flash(f'Error uploading Master CSV: {str(e)}', 'danger') finally: conn.close() return redirect(url_for('invcount.session_detail', session_id=session_id))