Completed:

 Admin: Process creation, field configuration, template upload
 Staff: Session list, new session (header form), scanning interface
 Duplicate detection (same session = blue, other session = orange)
 Weight entry popup, edit/delete scans
This commit is contained in:
Javier
2026-01-28 12:53:59 -06:00
parent ac73045ef2
commit b11421a8f5
16 changed files with 2603 additions and 34 deletions

700
blueprints/cons_sheets.py Normal file
View File

@@ -0,0 +1,700 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, session
from db import query_db, execute_db
from utils import role_required
cons_sheets_bp = Blueprint('cons_sheets', __name__)
@cons_sheets_bp.route('/admin/consumption-sheets')
@role_required('owner', 'admin')
def admin_processes():
"""List all consumption sheet process types"""
processes = query_db('''
SELECT cp.*, u.full_name as created_by_name,
(SELECT COUNT(*) FROM cons_process_fields
WHERE process_id = cp.id AND is_active = 1) as field_count
FROM cons_processes cp
LEFT JOIN Users u ON cp.created_by = u.user_id
WHERE cp.is_active = 1
ORDER BY cp.process_name
''')
return render_template('cons_sheets/admin_processes.html', processes=processes)
@cons_sheets_bp.route('/admin/consumption-sheets/create', methods=['GET', 'POST'])
@role_required('owner', 'admin')
def create_process():
"""Create a new process type"""
if request.method == 'POST':
process_name = request.form.get('process_name', '').strip()
if not process_name:
flash('Process name is required', 'danger')
return redirect(url_for('cons_sheets.create_process'))
# Generate process_key from name (lowercase, underscores)
process_key = process_name.lower().replace(' ', '_').replace('-', '_')
# Remove any non-alphanumeric characters except underscore
process_key = ''.join(c for c in process_key if c.isalnum() or c == '_')
# Check for duplicate key
existing = query_db('SELECT id FROM cons_processes WHERE process_key = ?', [process_key], one=True)
if existing:
flash(f'A process with key "{process_key}" already exists', 'danger')
return redirect(url_for('cons_sheets.create_process'))
process_id = execute_db('''
INSERT INTO cons_processes (process_key, process_name, created_by)
VALUES (?, ?, ?)
''', [process_key, process_name, session['user_id']])
flash(f'Process "{process_name}" created successfully!', 'success')
return redirect(url_for('cons_sheets.process_detail', process_id=process_id))
return render_template('cons_sheets/create_process.html')
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>')
@role_required('owner', 'admin')
def process_detail(process_id):
"""Process detail page - Database and Excel configuration"""
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
if not process:
flash('Process not found', 'danger')
return redirect(url_for('cons_sheets.admin_processes'))
# Get header fields
header_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
ORDER BY sort_order, id
''', [process_id])
# Get detail fields
detail_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
ORDER BY sort_order, id
''', [process_id])
return render_template('cons_sheets/process_detail.html',
process=process,
header_fields=header_fields,
detail_fields=detail_fields)
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/fields')
@role_required('owner', 'admin')
def process_fields(process_id):
"""Configure database fields for a process"""
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
if not process:
flash('Process not found', 'danger')
return redirect(url_for('cons_sheets.admin_processes'))
# Get header fields
header_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
ORDER BY sort_order, id
''', [process_id])
# Get detail fields
detail_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
ORDER BY sort_order, id
''', [process_id])
return render_template('cons_sheets/process_fields.html',
process=process,
header_fields=header_fields,
detail_fields=detail_fields)
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/template')
@role_required('owner', 'admin')
def process_template(process_id):
"""Configure Excel template for a process"""
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
if not process:
flash('Process not found', 'danger')
return redirect(url_for('cons_sheets.admin_processes'))
# Get all active fields for mapping display
header_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
ORDER BY sort_order, id
''', [process_id])
detail_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
ORDER BY sort_order, id
''', [process_id])
return render_template('cons_sheets/process_template.html',
process=process,
header_fields=header_fields,
detail_fields=detail_fields)
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/template/upload', methods=['POST'])
@role_required('owner', 'admin')
def upload_template(process_id):
"""Upload Excel template file"""
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
if not process:
flash('Process not found', 'danger')
return redirect(url_for('cons_sheets.admin_processes'))
if 'template_file' not in request.files:
flash('No file selected', 'danger')
return redirect(url_for('cons_sheets.process_template', process_id=process_id))
file = request.files['template_file']
if file.filename == '':
flash('No file selected', 'danger')
return redirect(url_for('cons_sheets.process_template', process_id=process_id))
if not file.filename.endswith('.xlsx'):
flash('Only .xlsx files are allowed', 'danger')
return redirect(url_for('cons_sheets.process_template', process_id=process_id))
# Read file as binary
template_data = file.read()
filename = file.filename
# Store in database
execute_db('''
UPDATE cons_processes
SET template_file = ?, template_filename = ?
WHERE id = ?
''', [template_data, filename, process_id])
flash(f'Template "{filename}" uploaded successfully!', 'success')
return redirect(url_for('cons_sheets.process_template', process_id=process_id))
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/template/settings', methods=['POST'])
@role_required('owner', 'admin')
def update_template_settings(process_id):
"""Update template page settings"""
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
if not process:
flash('Process not found', 'danger')
return redirect(url_for('cons_sheets.admin_processes'))
rows_per_page = request.form.get('rows_per_page', 30)
detail_start_row = request.form.get('detail_start_row', 10)
try:
rows_per_page = int(rows_per_page)
detail_start_row = int(detail_start_row)
except ValueError:
flash('Invalid number values', 'danger')
return redirect(url_for('cons_sheets.process_template', process_id=process_id))
execute_db('''
UPDATE cons_processes
SET rows_per_page = ?, detail_start_row = ?
WHERE id = ?
''', [rows_per_page, detail_start_row, process_id])
flash('Settings updated successfully!', 'success')
return redirect(url_for('cons_sheets.process_template', process_id=process_id))
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/template/download')
@role_required('owner', 'admin')
def download_template(process_id):
"""Download the stored Excel template"""
from flask import Response
process = query_db('SELECT template_file, template_filename FROM cons_processes WHERE id = ?', [process_id], one=True)
if not process or not process['template_file']:
flash('No template found', 'danger')
return redirect(url_for('cons_sheets.process_template', process_id=process_id))
return Response(
process['template_file'],
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
headers={'Content-Disposition': f'attachment; filename={process["template_filename"]}'}
)
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/fields/add/<table_type>', methods=['GET', 'POST'])
@role_required('owner', 'admin')
def add_field(process_id, table_type):
"""Add a new field to a process"""
if table_type not in ['header', 'detail']:
flash('Invalid table type', 'danger')
return redirect(url_for('cons_sheets.process_fields', process_id=process_id))
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
if not process:
flash('Process not found', 'danger')
return redirect(url_for('cons_sheets.admin_processes'))
if request.method == 'POST':
field_label = request.form.get('field_label', '').strip()
field_type = request.form.get('field_type', 'TEXT')
max_length = request.form.get('max_length', '')
is_required = 1 if request.form.get('is_required') else 0
excel_cell = request.form.get('excel_cell', '').strip().upper()
if not field_label:
flash('Field label is required', 'danger')
return redirect(url_for('cons_sheets.add_field', process_id=process_id, table_type=table_type))
# Generate field_name from label (lowercase, underscores)
field_name = field_label.lower().replace(' ', '_').replace('-', '_')
field_name = ''.join(c for c in field_name if c.isalnum() or c == '_')
# Check for duplicate field name in this process/table_type
existing = query_db('''
SELECT id FROM cons_process_fields
WHERE process_id = ? AND table_type = ? AND field_name = ? AND is_active = 1
''', [process_id, table_type, field_name], one=True)
if existing:
flash(f'A field with name "{field_name}" already exists', 'danger')
return redirect(url_for('cons_sheets.add_field', process_id=process_id, table_type=table_type))
# Get next sort_order
max_sort = query_db('''
SELECT MAX(sort_order) as max_sort FROM cons_process_fields
WHERE process_id = ? AND table_type = ?
''', [process_id, table_type], one=True)
sort_order = (max_sort['max_sort'] or 0) + 1
# Insert the field
execute_db('''
INSERT INTO cons_process_fields
(process_id, table_type, field_name, field_label, field_type, max_length, is_required, sort_order, excel_cell)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', [process_id, table_type, field_name, field_label, field_type,
int(max_length) if max_length else None, is_required, sort_order, excel_cell or None])
flash(f'Field "{field_label}" added successfully!', 'success')
return redirect(url_for('cons_sheets.process_fields', process_id=process_id))
return render_template('cons_sheets/add_field.html',
process=process,
table_type=table_type)
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/fields/<int:field_id>/edit', methods=['GET', 'POST'])
@role_required('owner', 'admin')
def edit_field(process_id, field_id):
"""Edit an existing field"""
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
field = query_db('SELECT * FROM cons_process_fields WHERE id = ? AND process_id = ?', [field_id, process_id], one=True)
if not process or not field:
flash('Process or field not found', 'danger')
return redirect(url_for('cons_sheets.admin_processes'))
if request.method == 'POST':
field_label = request.form.get('field_label', '').strip()
field_type = request.form.get('field_type', 'TEXT')
max_length = request.form.get('max_length', '')
is_required = 1 if request.form.get('is_required') else 0
excel_cell = request.form.get('excel_cell', '').strip().upper()
if not field_label:
flash('Field label is required', 'danger')
return redirect(url_for('cons_sheets.edit_field', process_id=process_id, field_id=field_id))
execute_db('''
UPDATE cons_process_fields
SET field_label = ?, field_type = ?, max_length = ?, is_required = ?, excel_cell = ?
WHERE id = ?
''', [field_label, field_type, int(max_length) if max_length else None, is_required, excel_cell or None, field_id])
flash(f'Field "{field_label}" updated successfully!', 'success')
return redirect(url_for('cons_sheets.process_fields', process_id=process_id))
return render_template('cons_sheets/edit_field.html',
process=process,
field=field)
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>/fields/<int:field_id>/delete', methods=['POST'])
@role_required('owner', 'admin')
def delete_field(process_id, field_id):
"""Soft-delete a field (rename column, set is_active = 0)"""
field = query_db('SELECT * FROM cons_process_fields WHERE id = ? AND process_id = ?', [field_id, process_id], one=True)
if not field:
return jsonify({'success': False, 'message': 'Field not found'})
# Soft delete: set is_active = 0
execute_db('UPDATE cons_process_fields SET is_active = 0 WHERE id = ?', [field_id])
return jsonify({'success': True, 'message': f'Field "{field["field_label"]}" deleted'})
# ============================================
# STAFF-FACING ROUTES (Scanning Interface)
# ============================================
from utils import login_required
@cons_sheets_bp.route('/cons-sheets')
@login_required
def index():
"""Consumption Sheets module landing - show user's sessions"""
user_id = session.get('user_id')
# Check if user has access to this module
has_access = query_db('''
SELECT 1 FROM UserModules um
JOIN Modules m ON um.module_id = m.module_id
WHERE um.user_id = ? AND m.module_key = 'cons_sheets' AND m.is_active = 1
''', [user_id], one=True)
if not has_access:
flash('You do not have access to this module', 'danger')
return redirect(url_for('home'))
# Get user's active sessions with process info
active_sessions = query_db('''
SELECT cs.*, cp.process_name, cp.process_key,
(SELECT COUNT(*) FROM cons_session_details WHERE session_id = cs.id AND is_deleted = 0) as scan_count
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.created_by = ? AND cs.status = 'active'
ORDER BY cs.created_at DESC
''', [user_id])
# Get available process types for creating new sessions
processes = query_db('''
SELECT * FROM cons_processes WHERE is_active = 1 ORDER BY process_name
''')
return render_template('cons_sheets/staff_index.html',
sessions=active_sessions,
processes=processes)
@cons_sheets_bp.route('/cons-sheets/new/<int:process_id>', methods=['GET', 'POST'])
@login_required
def new_session(process_id):
"""Create a new scanning session - enter header info"""
process = query_db('SELECT * FROM cons_processes WHERE id = ? AND is_active = 1', [process_id], one=True)
if not process:
flash('Process not found', 'danger')
return redirect(url_for('cons_sheets.index'))
# Get header fields for this process
header_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
ORDER BY sort_order, id
''', [process_id])
if request.method == 'POST':
# Validate required fields
missing_required = []
for field in header_fields:
if field['is_required']:
value = request.form.get(field['field_name'], '').strip()
if not value:
missing_required.append(field['field_label'])
if missing_required:
flash(f'Required fields missing: {", ".join(missing_required)}', 'danger')
return render_template('cons_sheets/new_session.html',
process=process,
header_fields=header_fields,
form_data=request.form)
# Create the session
session_id = execute_db('''
INSERT INTO cons_sessions (process_id, created_by)
VALUES (?, ?)
''', [process_id, session['user_id']])
# Save header field values
for field in header_fields:
value = request.form.get(field['field_name'], '').strip()
if value:
execute_db('''
INSERT INTO cons_session_header_values (session_id, field_id, field_value)
VALUES (?, ?, ?)
''', [session_id, field['id'], value])
flash('Session created! Start scanning lots.', 'success')
return redirect(url_for('cons_sheets.scan_session', session_id=session_id))
return render_template('cons_sheets/new_session.html',
process=process,
header_fields=header_fields,
form_data={})
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>')
@login_required
def scan_session(session_id):
"""Main scanning interface for a session"""
# Get session with process info
sess = query_db('''
SELECT cs.*, cp.process_name, cp.process_key, cp.id as process_id
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.id = ?
''', [session_id], one=True)
if not sess:
flash('Session not found', 'danger')
return redirect(url_for('cons_sheets.index'))
if sess['status'] == 'archived':
flash('This session has been archived', 'warning')
return redirect(url_for('cons_sheets.index'))
# Get header values for display
header_values = query_db('''
SELECT cpf.field_label, cpf.field_name, cshv.field_value
FROM cons_session_header_values cshv
JOIN cons_process_fields cpf ON cshv.field_id = cpf.id
WHERE cshv.session_id = ?
ORDER BY cpf.sort_order, cpf.id
''', [session_id])
# Get scanned details
scans = query_db('''
SELECT csd.*, u.full_name as scanned_by_name
FROM cons_session_details csd
JOIN Users u ON csd.scanned_by = u.user_id
WHERE csd.session_id = ? AND csd.is_deleted = 0
ORDER BY csd.scanned_at DESC
''', [session_id])
# Get detail fields for reference
detail_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
ORDER BY sort_order, id
''', [sess['process_id']])
return render_template('cons_sheets/scan_session.html',
session=sess,
header_values=header_values,
scans=scans,
detail_fields=detail_fields)
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>/scan', methods=['POST'])
@login_required
def scan_lot(session_id):
"""Process a lot scan with duplicate detection"""
sess = query_db('SELECT * FROM cons_sessions WHERE id = ? AND status = "active"', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found or archived'})
data = request.get_json()
lot_number = data.get('lot_number', '').strip()
item_number = data.get('item_number', '').strip()
weight = data.get('weight')
confirm_duplicate = data.get('confirm_duplicate', False)
check_only = data.get('check_only', False)
if not lot_number:
return jsonify({'success': False, 'message': 'Lot number required'})
if not check_only and weight is None:
return jsonify({'success': False, 'message': 'Weight required'})
if not check_only:
try:
weight = float(weight)
except (ValueError, TypeError):
return jsonify({'success': False, 'message': 'Invalid weight value'})
# Check for duplicates in SAME session
same_session_dup = query_db('''
SELECT * FROM cons_session_details
WHERE session_id = ? AND lot_number = ? AND is_deleted = 0
''', [session_id, lot_number], one=True)
# Check for duplicates in OTHER sessions (with header info for context)
other_session_dup = query_db('''
SELECT csd.*, cs.id as other_session_id, cs.created_at as other_session_date,
u.full_name as other_user,
(SELECT field_value FROM cons_session_header_values
WHERE session_id = cs.id AND field_id = (
SELECT id FROM cons_process_fields
WHERE process_id = cs.process_id AND field_name LIKE '%wo%' AND is_active = 1 LIMIT 1
)) as other_wo
FROM cons_session_details csd
JOIN cons_sessions cs ON csd.session_id = cs.id
JOIN Users u ON csd.scanned_by = u.user_id
WHERE csd.lot_number = ? AND csd.session_id != ? AND csd.is_deleted = 0
ORDER BY csd.scanned_at DESC
LIMIT 1
''', [lot_number, session_id], one=True)
duplicate_status = 'normal'
duplicate_info = None
needs_confirmation = False
if same_session_dup:
duplicate_status = 'dup_same_session'
duplicate_info = 'Already scanned in this session'
needs_confirmation = True
elif other_session_dup:
duplicate_status = 'dup_other_session'
dup_date = other_session_dup['other_session_date'][:10] if other_session_dup['other_session_date'] else 'Unknown'
dup_user = other_session_dup['other_user'] or 'Unknown'
dup_wo = other_session_dup['other_wo'] or 'N/A'
duplicate_info = f"Previously scanned on {dup_date} by {dup_user} on WO {dup_wo}"
needs_confirmation = True
# If just checking, return early
if check_only:
if needs_confirmation:
return jsonify({
'success': False,
'needs_confirmation': True,
'duplicate_status': duplicate_status,
'duplicate_info': duplicate_info,
'message': duplicate_info
})
return jsonify({'success': True, 'needs_confirmation': False})
# If needs confirmation and not confirmed, ask user
if needs_confirmation and not confirm_duplicate:
return jsonify({
'success': False,
'needs_confirmation': True,
'duplicate_status': duplicate_status,
'duplicate_info': duplicate_info,
'message': duplicate_info
})
# Insert the scan
detail_id = execute_db('''
INSERT INTO cons_session_details
(session_id, item_number, lot_number, weight, scanned_by, duplicate_status, duplicate_info)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', [session_id, item_number, lot_number, weight, session['user_id'], duplicate_status, duplicate_info])
# If this is a same-session duplicate, update the original scan too
updated_entry_ids = []
if duplicate_status == 'dup_same_session' and same_session_dup:
execute_db('''
UPDATE cons_session_details
SET duplicate_status = 'dup_same_session', duplicate_info = 'Duplicate lot'
WHERE id = ?
''', [same_session_dup['id']])
updated_entry_ids.append(same_session_dup['id'])
return jsonify({
'success': True,
'detail_id': detail_id,
'duplicate_status': duplicate_status,
'updated_entry_ids': updated_entry_ids
})
@cons_sheets_bp.route('/cons-sheets/detail/<int:detail_id>')
@login_required
def get_detail(detail_id):
"""Get detail info for editing"""
detail = query_db('''
SELECT csd.*, u.full_name as scanned_by_name
FROM cons_session_details csd
JOIN Users u ON csd.scanned_by = u.user_id
WHERE csd.id = ?
''', [detail_id], one=True)
if not detail:
return jsonify({'success': False, 'message': 'Detail not found'})
return jsonify({'success': True, 'detail': dict(detail)})
@cons_sheets_bp.route('/cons-sheets/detail/<int:detail_id>/update', methods=['POST'])
@login_required
def update_detail(detail_id):
"""Update a scanned detail"""
detail = query_db('SELECT * FROM cons_session_details WHERE id = ?', [detail_id], one=True)
if not detail:
return jsonify({'success': False, 'message': 'Detail not found'})
# Check permission
if detail['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'})
data = request.get_json()
item_number = data.get('item_number', '').strip()
lot_number = data.get('lot_number', '').strip()
weight = data.get('weight')
comment = data.get('comment', '')
if not lot_number:
return jsonify({'success': False, 'message': 'Lot number required'})
try:
weight = float(weight)
except (ValueError, TypeError):
return jsonify({'success': False, 'message': 'Invalid weight'})
execute_db('''
UPDATE cons_session_details
SET item_number = ?, lot_number = ?, weight = ?, comment = ?
WHERE id = ?
''', [item_number, lot_number, weight, comment, detail_id])
return jsonify({'success': True})
@cons_sheets_bp.route('/cons-sheets/detail/<int:detail_id>/delete', methods=['POST'])
@login_required
def delete_detail(detail_id):
"""Soft-delete a scanned detail"""
detail = query_db('SELECT * FROM cons_session_details WHERE id = ?', [detail_id], one=True)
if not detail:
return jsonify({'success': False, 'message': 'Detail not found'})
# Check permission
if detail['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'})
execute_db('UPDATE cons_session_details SET is_deleted = 1 WHERE id = ?', [detail_id])
return jsonify({'success': True})
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>/archive', methods=['POST'])
@login_required
def archive_session(session_id):
"""Archive (soft-delete) a session"""
sess = query_db('SELECT * FROM cons_sessions WHERE id = ?', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found'})
# Check permission
if sess['created_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'})
execute_db('UPDATE cons_sessions SET status = "archived" WHERE id = ?', [session_id])
return jsonify({'success': True})