1369 lines
58 KiB
Python
1369 lines
58 KiB
Python
"""
|
|
Consumption Sheets Module - Routes
|
|
Converted from conssheets.py
|
|
"""
|
|
from flask import render_template, request, redirect, url_for, flash, jsonify, session, send_file
|
|
from db import query_db, execute_db
|
|
from utils import login_required, role_required
|
|
from datetime import datetime
|
|
from global_actions import execute_pipeline
|
|
import sqlite3
|
|
import io
|
|
import os
|
|
|
|
|
|
|
|
def register_routes(bp):
|
|
"""Register all conssheets routes on the blueprint"""
|
|
|
|
# =========================================================================
|
|
# CONSUMPTION SHEETS ROUTES
|
|
# =========================================================================
|
|
|
|
@bp.route('/admin')
|
|
@role_required('owner', 'admin')
|
|
def admin_processes():
|
|
"""List all consumption sheet process types (Active or Archived)"""
|
|
show_archived = request.args.get('archived') == '1'
|
|
is_active_val = 0 if show_archived else 1
|
|
|
|
processes = query_db('''
|
|
SELECT cp.*,
|
|
u.full_name as created_by_name,
|
|
(SELECT COUNT(*) FROM cons_process_fields WHERE process_id = cp.id) as field_count
|
|
FROM cons_processes cp
|
|
LEFT JOIN users u ON cp.created_by = u.user_id
|
|
WHERE cp.is_active = ?
|
|
ORDER BY cp.process_name ASC
|
|
''', [is_active_val])
|
|
|
|
return render_template('conssheets/admin_processes.html',
|
|
processes=processes,
|
|
showing_archived=show_archived)
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/create', methods=['GET', 'POST'])
|
|
@role_required('owner', 'admin')
|
|
def create_process():
|
|
"""Create a new process type"""
|
|
if request.method == 'POST':
|
|
process_name = request.form.get('process_name', '').strip()
|
|
|
|
if not process_name:
|
|
flash('Process name is required', 'danger')
|
|
return redirect(url_for('conssheets.create_process'))
|
|
|
|
# Generate process_key from name (lowercase, underscores)
|
|
process_key = process_name.lower().replace(' ', '_').replace('-', '_')
|
|
# Remove any non-alphanumeric characters except underscore
|
|
process_key = ''.join(c for c in process_key if c.isalnum() or c == '_')
|
|
|
|
# Check for duplicate key
|
|
existing = query_db('SELECT id FROM cons_processes WHERE process_key = ?', [process_key], one=True)
|
|
if existing:
|
|
flash(f'A process with key "{process_key}" already exists', 'danger')
|
|
return redirect(url_for('conssheets.create_process'))
|
|
|
|
process_id = execute_db('''
|
|
INSERT INTO cons_processes (process_key, process_name, created_by)
|
|
VALUES (?, ?, ?)
|
|
''', [process_key, process_name, session['user_id']])
|
|
|
|
# Create dynamic detail table for this process
|
|
create_process_detail_table(process_key)
|
|
|
|
flash(f'Process "{process_name}" created successfully!', 'success')
|
|
return redirect(url_for('conssheets.process_detail', process_id=process_id))
|
|
|
|
return render_template('conssheets/create_process.html')
|
|
|
|
|
|
|
|
def get_db_path():
|
|
"""Get the database path"""
|
|
db_path = 'database/scanlook.db'
|
|
print(f"DEBUG: Database path is: {db_path}")
|
|
print(f"DEBUG: Path exists: {os.path.exists(db_path)}")
|
|
return db_path
|
|
|
|
|
|
def create_process_detail_table(process_key):
|
|
"""Create the dynamic detail table for a process with system columns"""
|
|
table_name = f'cons_proc_{process_key}_details'
|
|
print(f"DEBUG: Creating table {table_name}")
|
|
|
|
try:
|
|
db_path = get_db_path()
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(f'''
|
|
CREATE TABLE IF NOT EXISTS {table_name} (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id INTEGER NOT NULL,
|
|
scanned_by INTEGER NOT NULL,
|
|
scanned_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
duplicate_status TEXT DEFAULT 'normal' CHECK(duplicate_status IN ('normal', 'dup_same_session', 'dup_other_session')),
|
|
duplicate_info TEXT,
|
|
comment TEXT,
|
|
is_deleted INTEGER DEFAULT 0,
|
|
FOREIGN KEY (session_id) REFERENCES cons_sessions(id),
|
|
FOREIGN KEY (scanned_by) REFERENCES Users(user_id)
|
|
)
|
|
''')
|
|
|
|
# Create index on session_id
|
|
cursor.execute(f'CREATE INDEX IF NOT EXISTS idx_{process_key}_session ON {table_name}(session_id, is_deleted)')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"DEBUG: Table {table_name} created successfully!")
|
|
except Exception as e:
|
|
print(f"ERROR creating table {table_name}: {e}")
|
|
|
|
|
|
def add_column_to_detail_table(process_key, field_name, field_type):
|
|
"""Add a column to the process detail table"""
|
|
table_name = f'cons_proc_{process_key}_details'
|
|
|
|
# Map field types to SQLite types
|
|
sqlite_type = 'TEXT'
|
|
if field_type == 'INTEGER':
|
|
sqlite_type = 'INTEGER'
|
|
elif field_type == 'REAL':
|
|
sqlite_type = 'REAL'
|
|
|
|
conn = sqlite3.connect(get_db_path())
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute(f'ALTER TABLE {table_name} ADD COLUMN {field_name} {sqlite_type}')
|
|
conn.commit()
|
|
except Exception as e:
|
|
# Column might already exist
|
|
print(f"Note: Could not add column {field_name}: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def rename_column_in_detail_table(process_key, old_name, new_name):
|
|
"""Rename a column (for soft delete)"""
|
|
table_name = f'cons_proc_{process_key}_details'
|
|
|
|
conn = sqlite3.connect(get_db_path())
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute(f'ALTER TABLE {table_name} RENAME COLUMN {old_name} TO {new_name}')
|
|
conn.commit()
|
|
except Exception as e:
|
|
print(f"Note: Could not rename column {old_name}: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/delete', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def delete_process(process_id):
|
|
"""Soft-delete a process type (Archive it)"""
|
|
# Check if process exists
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
# Soft delete: Set is_active = 0
|
|
# The existing admin_processes route already filters for is_active=1,
|
|
# so this will effectively hide it from the list.
|
|
execute_db('UPDATE cons_processes SET is_active = 0 WHERE id = ?', [process_id])
|
|
|
|
flash(f'Process "{process["process_name"]}" has been deleted.', 'success')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/restore', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def restore_process(process_id):
|
|
"""Restore a soft-deleted process type"""
|
|
execute_db('UPDATE cons_processes SET is_active = 1 WHERE id = ?', [process_id])
|
|
flash('Process has been restored.', 'success')
|
|
return redirect(url_for('conssheets.admin_processes', archived=1))
|
|
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>')
|
|
@role_required('owner', 'admin')
|
|
def process_detail(process_id):
|
|
"""Process detail page - Database and Excel configuration"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
# Get header fields
|
|
header_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [process_id])
|
|
|
|
# Get detail fields
|
|
detail_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [process_id])
|
|
|
|
return render_template('conssheets/process_detail.html',
|
|
process=process,
|
|
header_fields=header_fields,
|
|
detail_fields=detail_fields)
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/router')
|
|
@role_required('owner', 'admin')
|
|
def process_router(process_id):
|
|
"""Configure IFTTT routing rules for a process"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
# Get existing rules sorted by line number (10, 20...)
|
|
rules = query_db('''
|
|
SELECT * FROM cons_process_router
|
|
WHERE process_id = ?
|
|
ORDER BY line_number ASC
|
|
''', [process_id])
|
|
|
|
return render_template('conssheets/process_router.html',
|
|
process=process,
|
|
rules=rules)
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/router/add', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def add_router_rule(process_id):
|
|
"""Add a new routing rule"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
line_number = request.form.get('line_number')
|
|
rule_name = request.form.get('rule_name')
|
|
match_pattern = request.form.get('match_pattern')
|
|
|
|
# Basic validation
|
|
if not line_number or not rule_name or not match_pattern:
|
|
flash('All fields are required', 'danger')
|
|
return redirect(url_for('conssheets.process_router', process_id=process_id))
|
|
|
|
try:
|
|
execute_db('''
|
|
INSERT INTO cons_process_router
|
|
(process_id, line_number, rule_name, match_pattern, actions_json, is_active)
|
|
VALUES (?, ?, ?, ?, '[]', 1)
|
|
''', [process_id, line_number, rule_name, match_pattern])
|
|
|
|
flash(f'Rule {line_number} created successfully!', 'success')
|
|
except Exception as e:
|
|
flash(f'Error creating rule: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('conssheets.process_router', process_id=process_id))
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/router/<int:rule_id>/edit', methods=['GET', 'POST'])
|
|
@role_required('owner', 'admin')
|
|
def edit_router_rule(process_id, rule_id):
|
|
"""Edit a specific routing rule and its logic actions"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
rule = query_db('SELECT * FROM cons_process_router WHERE id = ?', [rule_id], one=True)
|
|
|
|
if not process or not rule:
|
|
flash('Rule not found', 'danger')
|
|
return redirect(url_for('conssheets.process_router', process_id=process_id))
|
|
|
|
# NEW: Fetch all active fields so we can use them in the Logic Editor dropdowns
|
|
fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND is_active = 1
|
|
ORDER BY table_type, sort_order
|
|
''', [process_id])
|
|
|
|
if request.method == 'POST':
|
|
# 1. Update Basic Info
|
|
line_number = request.form.get('line_number')
|
|
rule_name = request.form.get('rule_name')
|
|
match_pattern = request.form.get('match_pattern')
|
|
|
|
# 2. Update the Logic Chain (JSON)
|
|
# We get the raw JSON string from a hidden input we'll build next
|
|
actions_json = request.form.get('actions_json', '[]')
|
|
|
|
try:
|
|
execute_db('''
|
|
UPDATE cons_process_router
|
|
SET line_number = ?, rule_name = ?, match_pattern = ?, actions_json = ?
|
|
WHERE id = ?
|
|
''', [line_number, rule_name, match_pattern, actions_json, rule_id])
|
|
|
|
flash('Rule configuration saved!', 'success')
|
|
except Exception as e:
|
|
flash(f'Error saving rule: {str(e)}', 'danger')
|
|
|
|
return redirect(url_for('conssheets.edit_router_rule', process_id=process_id, rule_id=rule_id))
|
|
|
|
return render_template('conssheets/edit_rule.html', process=process, rule=rule, fields=fields)
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/fields')
|
|
@role_required('owner', 'admin')
|
|
def process_fields(process_id):
|
|
"""Configure database fields for a process"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
# Get header fields
|
|
header_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [process_id])
|
|
|
|
# Get detail fields
|
|
detail_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [process_id])
|
|
|
|
return render_template('conssheets/process_fields.html',
|
|
process=process,
|
|
header_fields=header_fields,
|
|
detail_fields=detail_fields)
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/template')
|
|
@role_required('owner', 'admin')
|
|
def process_template(process_id):
|
|
"""Configure Excel template for a process"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
# Get all active fields for mapping display
|
|
header_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [process_id])
|
|
|
|
detail_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [process_id])
|
|
|
|
return render_template('conssheets/process_template.html',
|
|
process=process,
|
|
header_fields=header_fields,
|
|
detail_fields=detail_fields)
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/template/upload', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def upload_template(process_id):
|
|
"""Upload Excel template file"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
if 'template_file' not in request.files:
|
|
flash('No file selected', 'danger')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
file = request.files['template_file']
|
|
|
|
if file.filename == '':
|
|
flash('No file selected', 'danger')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
if not file.filename.endswith('.xlsx'):
|
|
flash('Only .xlsx files are allowed', 'danger')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
# Read file as binary
|
|
template_data = file.read()
|
|
filename = file.filename
|
|
|
|
# Store in database
|
|
execute_db('''
|
|
UPDATE cons_processes
|
|
SET template_file = ?, template_filename = ?
|
|
WHERE id = ?
|
|
''', [template_data, filename, process_id])
|
|
|
|
flash(f'Template "{filename}" uploaded successfully!', 'success')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/template/settings', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def update_template_settings(process_id):
|
|
"""Update template page settings"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
rows_per_page = request.form.get('rows_per_page', 30)
|
|
detail_start_row = request.form.get('detail_start_row', 10)
|
|
page_height = request.form.get('page_height')
|
|
print_start_col = request.form.get('print_start_col', 'A').strip().upper()
|
|
print_end_col = request.form.get('print_end_col', '').strip().upper()
|
|
|
|
try:
|
|
rows_per_page = int(rows_per_page)
|
|
detail_start_row = int(detail_start_row)
|
|
# We enforce page_height is required now
|
|
page_height = int(page_height) if page_height and page_height.strip() else None
|
|
|
|
if not page_height:
|
|
flash('Page Height is required for the new strategy', 'danger')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
except ValueError:
|
|
flash('Invalid number values', 'danger')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
# Update query - We ignore detail_end_row (leave it as is or null)
|
|
execute_db('''
|
|
UPDATE cons_processes
|
|
SET rows_per_page = ?, detail_start_row = ?, page_height = ?,
|
|
print_start_col = ?, print_end_col = ?
|
|
WHERE id = ?
|
|
''', [rows_per_page, detail_start_row, page_height, print_start_col, print_end_col, process_id])
|
|
|
|
flash('Settings updated successfully!', 'success')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/template/download')
|
|
@role_required('owner', 'admin')
|
|
def download_template(process_id):
|
|
"""Download the stored Excel template"""
|
|
from flask import Response
|
|
|
|
process = query_db('SELECT template_file, template_filename FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process or not process['template_file']:
|
|
flash('No template found', 'danger')
|
|
return redirect(url_for('conssheets.process_template', process_id=process_id))
|
|
|
|
return Response(
|
|
process['template_file'],
|
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
headers={'Content-Disposition': f'attachment; filename={process["template_filename"]}'}
|
|
)
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/fields/add/<table_type>', methods=['GET', 'POST'])
|
|
@role_required('owner', 'admin')
|
|
def add_field(process_id, table_type):
|
|
"""Add a new field to a process"""
|
|
if table_type not in ['header', 'detail']:
|
|
flash('Invalid table type', 'danger')
|
|
return redirect(url_for('conssheets.process_fields', process_id=process_id))
|
|
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
if request.method == 'POST':
|
|
field_label = request.form.get('field_label', '').strip()
|
|
field_type = request.form.get('field_type', 'TEXT')
|
|
max_length = request.form.get('max_length', '')
|
|
is_required = 1 if request.form.get('is_required') else 0
|
|
excel_cell = request.form.get('excel_cell', '').strip().upper()
|
|
|
|
if not field_label:
|
|
flash('Field label is required', 'danger')
|
|
return redirect(url_for('conssheets.add_field', process_id=process_id, table_type=table_type))
|
|
|
|
# Generate field_name from label (lowercase, underscores)
|
|
field_name = field_label.lower().replace(' ', '_').replace('-', '_')
|
|
field_name = ''.join(c for c in field_name if c.isalnum() or c == '_')
|
|
|
|
# Check for duplicate field name in this process/table_type
|
|
existing = query_db('''
|
|
SELECT id FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = ? AND field_name = ? AND is_active = 1
|
|
''', [process_id, table_type, field_name], one=True)
|
|
|
|
if existing:
|
|
flash(f'A field with name "{field_name}" already exists', 'danger')
|
|
return redirect(url_for('conssheets.add_field', process_id=process_id, table_type=table_type))
|
|
|
|
# Get next sort_order
|
|
max_sort = query_db('''
|
|
SELECT MAX(sort_order) as max_sort FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = ?
|
|
''', [process_id, table_type], one=True)
|
|
sort_order = (max_sort['max_sort'] or 0) + 1
|
|
|
|
# For detail fields, check for duplicate key checkbox
|
|
is_duplicate_key = 1 if (table_type == 'detail' and request.form.get('is_duplicate_key')) else 0
|
|
|
|
# Insert the field
|
|
execute_db('''
|
|
INSERT INTO cons_process_fields
|
|
(process_id, table_type, field_name, field_label, field_type, max_length, is_required, is_duplicate_key, sort_order, excel_cell)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', [process_id, table_type, field_name, field_label, field_type,
|
|
int(max_length) if max_length else None, is_required, is_duplicate_key, sort_order, excel_cell or None])
|
|
|
|
# For detail fields, also add the column to the dynamic table
|
|
if table_type == 'detail':
|
|
add_column_to_detail_table(process['process_key'], field_name, field_type)
|
|
|
|
flash(f'Field "{field_label}" added successfully!', 'success')
|
|
return redirect(url_for('conssheets.process_fields', process_id=process_id))
|
|
|
|
return render_template('conssheets/add_field.html',
|
|
process=process,
|
|
table_type=table_type)
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/fields/<int:field_id>/edit', methods=['GET', 'POST'])
|
|
@role_required('owner', 'admin')
|
|
def edit_field(process_id, field_id):
|
|
"""Edit an existing field"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
field = query_db('SELECT * FROM cons_process_fields WHERE id = ? AND process_id = ?', [field_id, process_id], one=True)
|
|
|
|
if not process or not field:
|
|
flash('Process or field not found', 'danger')
|
|
return redirect(url_for('conssheets.admin_processes'))
|
|
|
|
if request.method == 'POST':
|
|
field_label = request.form.get('field_label', '').strip()
|
|
field_type = request.form.get('field_type', 'TEXT')
|
|
max_length = request.form.get('max_length', '')
|
|
is_required = 1 if request.form.get('is_required') else 0
|
|
is_duplicate_key = 1 if (field['table_type'] == 'detail' and request.form.get('is_duplicate_key')) else 0
|
|
excel_cell = request.form.get('excel_cell', '').strip().upper()
|
|
|
|
if not field_label:
|
|
flash('Field label is required', 'danger')
|
|
return redirect(url_for('conssheets.edit_field', process_id=process_id, field_id=field_id))
|
|
|
|
execute_db('''
|
|
UPDATE cons_process_fields
|
|
SET field_label = ?, field_type = ?, max_length = ?, is_required = ?, is_duplicate_key = ?, excel_cell = ?
|
|
WHERE id = ?
|
|
''', [field_label, field_type, int(max_length) if max_length else None, is_required, is_duplicate_key, excel_cell or None, field_id])
|
|
|
|
flash(f'Field "{field_label}" updated successfully!', 'success')
|
|
return redirect(url_for('conssheets.process_fields', process_id=process_id))
|
|
|
|
return render_template('conssheets/edit_field.html',
|
|
process=process,
|
|
field=field)
|
|
|
|
|
|
@bp.route('/admin/consumption-sheets/<int:process_id>/fields/<int:field_id>/delete', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def delete_field(process_id, field_id):
|
|
"""Soft-delete a field (rename column, set is_active = 0)"""
|
|
field = query_db('SELECT * FROM cons_process_fields WHERE id = ? AND process_id = ?', [field_id, process_id], one=True)
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
|
|
|
|
if not field or not process:
|
|
return jsonify({'success': False, 'message': 'Field not found'})
|
|
|
|
# Soft delete: set is_active = 0
|
|
execute_db('UPDATE cons_process_fields SET is_active = 0 WHERE id = ?', [field_id])
|
|
|
|
# For detail fields, rename the column to preserve data
|
|
if field['table_type'] == 'detail':
|
|
old_name = field['field_name']
|
|
new_name = f"Del_{field_id}_{old_name}"
|
|
rename_column_in_detail_table(process['process_key'], old_name, new_name)
|
|
|
|
return jsonify({'success': True, 'message': f'Field "{field["field_label"]}" deleted'})
|
|
|
|
|
|
|
|
|
|
def get_detail_table_name(process_key):
|
|
"""Get the dynamic detail table name for a process"""
|
|
return f'cons_proc_{process_key}_details'
|
|
|
|
|
|
def get_duplicate_key_field(process_id):
|
|
"""Get the field marked as duplicate key for a process"""
|
|
return query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_duplicate_key = 1 AND is_active = 1
|
|
LIMIT 1
|
|
''', [process_id], one=True)
|
|
|
|
|
|
@bp.route('/')
|
|
@login_required
|
|
def index():
|
|
"""Consumption Sheets module landing - show user's sessions"""
|
|
user_id = session.get('user_id')
|
|
|
|
# Check if user has access to this module
|
|
has_access = query_db('''
|
|
SELECT 1 FROM UserModules um
|
|
JOIN Modules m ON um.module_id = m.module_id
|
|
WHERE um.user_id = ? AND m.module_key = 'conssheets' AND m.is_active = 1
|
|
''', [user_id], one=True)
|
|
|
|
if not has_access:
|
|
flash('You do not have access to this module', 'danger')
|
|
return redirect(url_for('home'))
|
|
|
|
# Check if user wants to see archived/deleted sessions
|
|
show_archived = request.args.get('show_archived') == '1'
|
|
|
|
# Get sessions based on filter
|
|
if show_archived:
|
|
# Show active + archived + last 20 deleted
|
|
all_sessions = query_db('''
|
|
SELECT cs.*, cp.process_name, cp.process_key
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.created_by = ?
|
|
ORDER BY
|
|
CASE cs.status
|
|
WHEN 'active' THEN 1
|
|
WHEN 'archived' THEN 2
|
|
WHEN 'deleted' THEN 3
|
|
END,
|
|
cs.created_at DESC
|
|
''', [user_id])
|
|
|
|
# Separate active, archived, and deleted (limit deleted to 20)
|
|
active_sessions = [s for s in all_sessions if s['status'] == 'active']
|
|
archived_sessions = [s for s in all_sessions if s['status'] == 'archived']
|
|
deleted_sessions = [s for s in all_sessions if s['status'] == 'deleted'][:20]
|
|
|
|
combined_sessions = active_sessions + archived_sessions + deleted_sessions
|
|
else:
|
|
# Show only active sessions
|
|
combined_sessions = query_db('''
|
|
SELECT cs.*, cp.process_name, cp.process_key
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.created_by = ? AND cs.status = 'active'
|
|
ORDER BY cs.created_at DESC
|
|
''', [user_id])
|
|
|
|
# Get scan counts and header values for each session
|
|
sessions_with_counts = []
|
|
for sess in combined_sessions:
|
|
sess_dict = dict(sess)
|
|
|
|
# Get scan count
|
|
table_name = get_detail_table_name(sess['process_key'])
|
|
try:
|
|
count_result = query_db(f'''
|
|
SELECT COUNT(*) as scan_count FROM {table_name}
|
|
WHERE session_id = ? AND is_deleted = 0
|
|
''', [sess['id']], one=True)
|
|
sess_dict['scan_count'] = count_result['scan_count'] if count_result else 0
|
|
except:
|
|
sess_dict['scan_count'] = 0
|
|
|
|
# Get first 5 required header fields with their values
|
|
header_fields = query_db('''
|
|
SELECT cpf.field_label, cshv.field_value
|
|
FROM cons_process_fields cpf
|
|
LEFT JOIN cons_session_header_values cshv
|
|
ON cpf.id = cshv.field_id AND cshv.session_id = ?
|
|
WHERE cpf.process_id = ?
|
|
AND cpf.table_type = 'header'
|
|
AND cpf.is_required = 1
|
|
AND cpf.is_active = 1
|
|
ORDER BY cpf.sort_order, cpf.id
|
|
LIMIT 5
|
|
''', [sess['id'], sess['process_id']])
|
|
|
|
sess_dict['header_preview'] = header_fields
|
|
sessions_with_counts.append(sess_dict)
|
|
|
|
# Get available process types for creating new sessions
|
|
processes = query_db('''
|
|
SELECT * FROM cons_processes WHERE is_active = 1 ORDER BY process_name
|
|
''')
|
|
|
|
return render_template('conssheets/staff_index.html',
|
|
sessions=sessions_with_counts,
|
|
processes=processes,
|
|
show_archived=show_archived)
|
|
|
|
|
|
@bp.route('/new/<int:process_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def new_session(process_id):
|
|
"""Create a new scanning session - enter header info"""
|
|
process = query_db('SELECT * FROM cons_processes WHERE id = ? AND is_active = 1', [process_id], one=True)
|
|
|
|
if not process:
|
|
flash('Process not found', 'danger')
|
|
return redirect(url_for('conssheets.index'))
|
|
|
|
# Get header fields for this process
|
|
header_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'header' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [process_id])
|
|
|
|
if request.method == 'POST':
|
|
# Validate required fields
|
|
missing_required = []
|
|
for field in header_fields:
|
|
if field['is_required']:
|
|
value = request.form.get(field['field_name'], '').strip()
|
|
if not value:
|
|
missing_required.append(field['field_label'])
|
|
|
|
if missing_required:
|
|
flash(f'Required fields missing: {", ".join(missing_required)}', 'danger')
|
|
return render_template('conssheets/new_session.html',
|
|
process=process,
|
|
header_fields=header_fields,
|
|
form_data=request.form)
|
|
|
|
# Create the session
|
|
session_id = execute_db('''
|
|
INSERT INTO cons_sessions (process_id, created_by)
|
|
VALUES (?, ?)
|
|
''', [process_id, session['user_id']])
|
|
|
|
# Save header field values
|
|
for field in header_fields:
|
|
value = request.form.get(field['field_name'], '').strip()
|
|
if value:
|
|
execute_db('''
|
|
INSERT INTO cons_session_header_values (session_id, field_id, field_value)
|
|
VALUES (?, ?, ?)
|
|
''', [session_id, field['id'], value])
|
|
|
|
flash('Session created! Start scanning lots.', 'success')
|
|
return redirect(url_for('conssheets.scan_session', session_id=session_id))
|
|
|
|
return render_template('conssheets/new_session.html',
|
|
process=process,
|
|
header_fields=header_fields,
|
|
form_data={})
|
|
|
|
|
|
@bp.route('/session/<int:session_id>')
|
|
@login_required
|
|
def scan_session(session_id):
|
|
"""Main scanning interface for a session"""
|
|
# Get session with process info
|
|
sess = query_db('''
|
|
SELECT cs.*, cp.process_name, cp.process_key, cp.id as process_id
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.id = ?
|
|
''', [session_id], one=True)
|
|
|
|
if not sess:
|
|
flash('Session not found', 'danger')
|
|
return redirect(url_for('conssheets.index'))
|
|
|
|
if sess['status'] == 'archived':
|
|
flash('This session has been archived', 'warning')
|
|
return redirect(url_for('conssheets.index'))
|
|
|
|
# Get header values for display
|
|
header_values = query_db('''
|
|
SELECT cpf.field_label, cpf.field_name, cshv.field_value
|
|
FROM cons_session_header_values cshv
|
|
JOIN cons_process_fields cpf ON cshv.field_id = cpf.id
|
|
WHERE cshv.session_id = ?
|
|
ORDER BY cpf.sort_order, cpf.id
|
|
''', [session_id])
|
|
|
|
# Get detail fields for this process (convert to dicts for JSON serialization)
|
|
detail_fields_rows = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
|
|
ORDER BY sort_order, id
|
|
''', [sess['process_id']])
|
|
detail_fields = [dict(row) for row in detail_fields_rows] if detail_fields_rows else []
|
|
|
|
# Get scanned details from the dynamic table
|
|
table_name = get_detail_table_name(sess['process_key'])
|
|
scans = query_db(f'''
|
|
SELECT t.*, u.full_name as scanned_by_name
|
|
FROM {table_name} t
|
|
JOIN Users u ON t.scanned_by = u.user_id
|
|
WHERE t.session_id = ? AND t.is_deleted = 0
|
|
ORDER BY t.scanned_at DESC
|
|
''', [session_id])
|
|
|
|
# Get the duplicate key field (convert to dict for JSON)
|
|
dup_key_field_row = get_duplicate_key_field(sess['process_id'])
|
|
dup_key_field = dict(dup_key_field_row) if dup_key_field_row else None
|
|
|
|
return render_template('conssheets/scan_session.html',
|
|
session=sess,
|
|
header_values=header_values,
|
|
scans=scans,
|
|
detail_fields=detail_fields,
|
|
dup_key_field=dup_key_field)
|
|
|
|
|
|
@bp.route('/session/<int:session_id>/scan', methods=['POST'])
|
|
@login_required
|
|
def scan_lot(session_id):
|
|
from global_actions import execute_pipeline
|
|
import re
|
|
import json
|
|
|
|
# 1. Setup Context & Get Session
|
|
# We need the process_key to know which table to save to
|
|
sess = query_db('''
|
|
SELECT cs.*, cp.process_key, cp.id as process_id
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.id = ?
|
|
''', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session invalid'})
|
|
|
|
# 2. Get Data from Frontend
|
|
data = request.get_json()
|
|
barcode = data.get('barcode', '').strip()
|
|
|
|
# 3. Find Matching Rule (The Routing)
|
|
matched_rule = None
|
|
if barcode:
|
|
rules = query_db('SELECT * FROM cons_process_router WHERE process_id = ? AND is_active = 1 ORDER BY line_number ASC', [sess['process_id']])
|
|
for rule in rules:
|
|
try:
|
|
if re.search(rule['match_pattern'], barcode):
|
|
matched_rule = rule
|
|
break
|
|
except: continue
|
|
|
|
if not matched_rule:
|
|
return jsonify({'success': False, 'message': f"❌ No rule matched: {barcode}"})
|
|
|
|
# 4. Execute Pipeline (The Processing)
|
|
context = {
|
|
'table_name': f"cons_proc_{sess['process_key']}_details",
|
|
'session_id': session_id,
|
|
'user_id': session.get('user_id'),
|
|
|
|
# CRITICAL FIXES:
|
|
# Pass the "Yes" flag so it doesn't ask about duplicates again
|
|
'confirm_duplicate': data.get('confirm_duplicate', False),
|
|
|
|
# Pass the "Weight" (or other inputs) so it doesn't open the form again
|
|
'extra_data': data.get('field_values') or data.get('extra_data')
|
|
}
|
|
|
|
try:
|
|
# The global engine handles Map, Clean, Duplicate, Input, and Save!
|
|
actions = json.loads(matched_rule['actions_json'])
|
|
result = execute_pipeline(actions, barcode, context)
|
|
return jsonify(result)
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': f"System Error: {str(e)}"})
|
|
|
|
|
|
@bp.route('/session/<int:session_id>/detail/<int:detail_id>')
|
|
@login_required
|
|
def get_detail(session_id, detail_id):
|
|
"""Get detail info for editing"""
|
|
sess = query_db('''
|
|
SELECT cs.*, cp.process_key
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.id = ?
|
|
''', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session not found'})
|
|
|
|
table_name = get_detail_table_name(sess['process_key'])
|
|
|
|
detail = query_db(f'''
|
|
SELECT t.*, u.full_name as scanned_by_name
|
|
FROM {table_name} t
|
|
JOIN Users u ON t.scanned_by = u.user_id
|
|
WHERE t.id = ? AND t.session_id = ?
|
|
''', [detail_id, session_id], one=True)
|
|
|
|
if not detail:
|
|
return jsonify({'success': False, 'message': 'Detail not found'})
|
|
|
|
return jsonify({'success': True, 'detail': dict(detail)})
|
|
|
|
|
|
@bp.route('/session/<int:session_id>/detail/<int:detail_id>/update', methods=['POST'])
|
|
@login_required
|
|
def update_detail(session_id, detail_id):
|
|
"""Update a scanned detail"""
|
|
sess = query_db('''
|
|
SELECT cs.*, cp.process_key, cp.id as process_id
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.id = ?
|
|
''', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session not found'})
|
|
|
|
table_name = get_detail_table_name(sess['process_key'])
|
|
|
|
detail = query_db(f'SELECT * FROM {table_name} WHERE id = ? AND session_id = ?', [detail_id, session_id], one=True)
|
|
|
|
if not detail:
|
|
return jsonify({'success': False, 'message': 'Detail not found'})
|
|
|
|
# Check permission
|
|
if detail['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
data = request.get_json()
|
|
field_values = data.get('field_values', {})
|
|
comment = data.get('comment', '')
|
|
|
|
# Get all active detail fields for this process
|
|
detail_fields = query_db('''
|
|
SELECT * FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
|
|
''', [sess['process_id']])
|
|
|
|
# Build dynamic UPDATE statement
|
|
set_clauses = ['comment = ?']
|
|
values = [comment]
|
|
|
|
for field in detail_fields:
|
|
if field['field_name'] in field_values:
|
|
set_clauses.append(f"{field['field_name']} = ?")
|
|
values.append(field_values[field['field_name']])
|
|
|
|
values.append(detail_id)
|
|
|
|
update_sql = f'''
|
|
UPDATE {table_name}
|
|
SET {', '.join(set_clauses)}
|
|
WHERE id = ?
|
|
'''
|
|
|
|
execute_db(update_sql, values)
|
|
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@bp.route('/session/<int:session_id>/detail/<int:detail_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_detail(session_id, detail_id):
|
|
"""Soft-delete a scanned detail"""
|
|
sess = query_db('''
|
|
SELECT cs.*, cp.process_key
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.id = ?
|
|
''', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session not found'})
|
|
|
|
table_name = get_detail_table_name(sess['process_key'])
|
|
|
|
detail = query_db(f'SELECT * FROM {table_name} WHERE id = ? AND session_id = ?', [detail_id, session_id], one=True)
|
|
|
|
if not detail:
|
|
return jsonify({'success': False, 'message': 'Detail not found'})
|
|
|
|
# Check permission
|
|
if detail['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
execute_db(f'UPDATE {table_name} SET is_deleted = 1 WHERE id = ?', [detail_id])
|
|
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@bp.route('/session/<int:session_id>/archive', methods=['POST'])
|
|
@login_required
|
|
def archive_session(session_id):
|
|
"""Archive (soft-delete) a session"""
|
|
sess = query_db('SELECT * FROM cons_sessions WHERE id = ?', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session not found'})
|
|
|
|
# Check permission
|
|
if sess['created_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
execute_db('UPDATE cons_sessions SET status = "archived" WHERE id = ?', [session_id])
|
|
|
|
return jsonify({'success': True})
|
|
|
|
@bp.route('/session/<int:session_id>/unarchive', methods=['POST'])
|
|
@login_required
|
|
def unarchive_session(session_id):
|
|
"""Unarchive a session back to active"""
|
|
sess = query_db('SELECT * FROM cons_sessions WHERE id = ?', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session not found'})
|
|
|
|
# Check permission
|
|
if sess['created_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
|
|
return jsonify({'success': False, 'message': 'Permission denied'})
|
|
|
|
execute_db('UPDATE cons_sessions SET status = "active" WHERE id = ?', [session_id])
|
|
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@bp.route('/session/<int:session_id>/delete', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def delete_session(session_id):
|
|
"""Delete a session (admin only) - soft delete session and all detail rows"""
|
|
sess = query_db('SELECT cs.*, cp.process_key FROM cons_sessions cs JOIN cons_processes cp ON cs.process_id = cp.id WHERE cs.id = ?', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session not found'})
|
|
|
|
# Update session status
|
|
execute_db('UPDATE cons_sessions SET status = "deleted" WHERE id = ?', [session_id])
|
|
|
|
# Mark all detail rows as deleted
|
|
table_name = get_detail_table_name(sess['process_key'])
|
|
execute_db(f'UPDATE {table_name} SET is_deleted = 1 WHERE session_id = ?', [session_id])
|
|
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@bp.route('/session/<int:session_id>/restore', methods=['POST'])
|
|
@role_required('owner', 'admin')
|
|
def restore_session(session_id):
|
|
"""Restore a deleted session (admin only) - restore session and all detail rows"""
|
|
sess = query_db('SELECT cs.*, cp.process_key FROM cons_sessions cs JOIN cons_processes cp ON cs.process_id = cp.id WHERE cs.id = ?', [session_id], one=True)
|
|
|
|
if not sess:
|
|
return jsonify({'success': False, 'message': 'Session not found'})
|
|
|
|
# Update session status to active
|
|
execute_db('UPDATE cons_sessions SET status = "active" WHERE id = ?', [session_id])
|
|
|
|
# Restore all detail rows
|
|
table_name = get_detail_table_name(sess['process_key'])
|
|
execute_db(f'UPDATE {table_name} SET is_deleted = 0 WHERE session_id = ?', [session_id])
|
|
|
|
return jsonify({'success': True})
|
|
|
|
@bp.route('/session/<int:session_id>/template')
|
|
@login_required
|
|
def download_import_template(session_id):
|
|
"""Generate a blank Excel template for bulk import"""
|
|
from flask import Response # <--- ADDED THIS
|
|
from io import BytesIO
|
|
import openpyxl
|
|
|
|
# Get Process ID
|
|
sess = query_db('SELECT process_id FROM cons_sessions WHERE id = ?', [session_id], one=True)
|
|
if not sess: return redirect(url_for('conssheets.index'))
|
|
|
|
# Get Detail Fields
|
|
fields = query_db('''
|
|
SELECT field_name, field_label
|
|
FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
|
|
ORDER BY sort_order
|
|
''', [sess['process_id']])
|
|
|
|
# Create Workbook
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "Import Data"
|
|
|
|
# Write Header Row (Field Names)
|
|
headers = [f['field_name'] for f in fields]
|
|
ws.append(headers)
|
|
|
|
output = BytesIO()
|
|
wb.save(output)
|
|
output.seek(0)
|
|
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
headers={'Content-Disposition': 'attachment; filename=import_template.xlsx'}
|
|
)
|
|
|
|
@bp.route('/session/<int:session_id>/import', methods=['POST'])
|
|
@login_required
|
|
def import_session_data(session_id):
|
|
"""Bulk import detail rows from Excel"""
|
|
# Import EVERYTHING locally to avoid NameErrors
|
|
import openpyxl
|
|
from datetime import datetime
|
|
from flask import request, flash, redirect, url_for, session
|
|
|
|
# 1. Get Session Info
|
|
sess = query_db('''
|
|
SELECT cs.*, cp.process_key
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.id = ?
|
|
''', [session_id], one=True)
|
|
|
|
if not sess:
|
|
flash('Session not found', 'danger')
|
|
return redirect(url_for('conssheets.index'))
|
|
|
|
# 2. Check File
|
|
if 'file' not in request.files:
|
|
flash('No file uploaded', 'danger')
|
|
return redirect(url_for('conssheets.scan_session', session_id=session_id))
|
|
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
flash('No file selected', 'danger')
|
|
return redirect(url_for('conssheets.scan_session', session_id=session_id))
|
|
|
|
try:
|
|
# 3. Read Excel
|
|
wb = openpyxl.load_workbook(file)
|
|
ws = wb.active
|
|
|
|
# Get headers from first row
|
|
headers = [cell.value for cell in ws[1]]
|
|
|
|
# Get valid field names for this process
|
|
valid_fields = query_db('''
|
|
SELECT field_name
|
|
FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
|
|
''', [sess['process_id']])
|
|
valid_field_names = [f['field_name'] for f in valid_fields]
|
|
|
|
# Map Excel Columns to DB Fields
|
|
col_mapping = {}
|
|
for idx, header in enumerate(headers):
|
|
if header and header in valid_field_names:
|
|
col_mapping[idx] = header
|
|
|
|
if not col_mapping:
|
|
flash('Error: No matching columns found in Excel. Please use the template.', 'danger')
|
|
return redirect(url_for('conssheets.scan_session', session_id=session_id))
|
|
|
|
# 4. Process Rows
|
|
table_name = f"cons_proc_{sess['process_key']}_details"
|
|
rows_inserted = 0
|
|
|
|
# Get User ID safely from session
|
|
user_id = session.get('user_id')
|
|
|
|
for row in ws.iter_rows(min_row=2, values_only=True):
|
|
if not any(row): continue
|
|
|
|
data = {}
|
|
for col_idx, value in enumerate(row):
|
|
if col_idx in col_mapping:
|
|
data[col_mapping[col_idx]] = value
|
|
|
|
if not data: continue
|
|
|
|
# Add Metadata
|
|
data['session_id'] = session_id
|
|
data['scanned_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
data['scanned_by'] = user_id
|
|
|
|
# REMOVED: data['is_valid'] = 1 (This column does not exist)
|
|
|
|
data['is_deleted'] = 0
|
|
|
|
# Dynamic Insert SQL
|
|
columns = ', '.join(data.keys())
|
|
placeholders = ', '.join(['?'] * len(data))
|
|
values = list(data.values())
|
|
|
|
sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
|
|
execute_db(sql, values)
|
|
rows_inserted += 1
|
|
|
|
flash(f'Successfully imported {rows_inserted} records!', 'success')
|
|
|
|
except Exception as e:
|
|
# This will catch any other errors and show them to you
|
|
flash(f'Import Error: {str(e)}', 'danger')
|
|
print(f"DEBUG IMPORT ERROR: {str(e)}") # Print to console for good measure
|
|
|
|
return redirect(url_for('conssheets.scan_session', session_id=session_id))
|
|
|
|
@bp.route('/session/<int:session_id>/export')
|
|
@login_required
|
|
def export_session(session_id):
|
|
"""Export session: Hide Rows Strategy + Manual Column Widths"""
|
|
from flask import Response
|
|
from io import BytesIO
|
|
import openpyxl
|
|
# Correct imports for newer openpyxl
|
|
from openpyxl.utils.cell import coordinate_from_string, get_column_letter
|
|
from openpyxl.worksheet.pagebreak import Break
|
|
from datetime import datetime
|
|
import math
|
|
|
|
# --- FIX 1: Update SQL to fetch the new columns ---
|
|
sess = query_db('''
|
|
SELECT cs.*, cp.process_name, cp.process_key, cp.id as process_id,
|
|
cp.template_file, cp.template_filename,
|
|
cp.rows_per_page, cp.detail_start_row, cp.page_height,
|
|
cp.print_start_col, cp.print_end_col
|
|
FROM cons_sessions cs
|
|
JOIN cons_processes cp ON cs.process_id = cp.id
|
|
WHERE cs.id = ?
|
|
''', [session_id], one=True)
|
|
|
|
if not sess or not sess['template_file']:
|
|
flash('Session or Template not found', 'danger')
|
|
return redirect(url_for('conssheets.index'))
|
|
|
|
# Validation
|
|
page_height = sess['page_height']
|
|
rows_per_page = sess['rows_per_page'] or 30
|
|
detail_start_row = sess['detail_start_row'] or 10
|
|
|
|
if not page_height:
|
|
flash('Configuration Error: Page Height is not set.', 'danger')
|
|
return redirect(url_for('conssheets.scan_session', session_id=session_id))
|
|
|
|
# Get Data
|
|
header_fields = query_db('''
|
|
SELECT cpf.field_name, cpf.excel_cell, cshv.field_value
|
|
FROM cons_process_fields cpf
|
|
LEFT JOIN cons_session_header_values cshv ON cpf.id = cshv.field_id AND cshv.session_id = ?
|
|
WHERE cpf.process_id = ? AND cpf.table_type = 'header' AND cpf.is_active = 1 AND cpf.excel_cell IS NOT NULL
|
|
''', [session_id, sess['process_id']])
|
|
|
|
detail_fields = query_db('''
|
|
SELECT field_name, excel_cell, field_type
|
|
FROM cons_process_fields
|
|
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1 AND excel_cell IS NOT NULL
|
|
ORDER BY sort_order, id
|
|
''', [sess['process_id']])
|
|
|
|
table_name = f'cons_proc_{sess["process_key"]}_details'
|
|
scans = query_db(f'''
|
|
SELECT * FROM {table_name}
|
|
WHERE session_id = ? AND is_deleted = 0
|
|
ORDER BY scanned_at ASC
|
|
''', [session_id])
|
|
|
|
# Setup Excel
|
|
wb = openpyxl.load_workbook(BytesIO(sess['template_file']))
|
|
ws = wb.active
|
|
|
|
# Clear existing breaks
|
|
ws.row_breaks.brk = []
|
|
ws.col_breaks.brk = []
|
|
|
|
# Calculate Pages Needed
|
|
total_items = len(scans)
|
|
total_pages = math.ceil(total_items / rows_per_page) if total_items > 0 else 1
|
|
|
|
# --- MAIN LOOP ---
|
|
for page_idx in range(total_pages):
|
|
|
|
# 1. Fill Header
|
|
for field in header_fields:
|
|
if field['excel_cell'] and field['field_value']:
|
|
try:
|
|
col_letter, row_str = coordinate_from_string(field['excel_cell'])
|
|
base_row = int(row_str)
|
|
target_row = base_row + (page_idx * page_height)
|
|
ws[f"{col_letter}{target_row}"] = field['field_value']
|
|
except: pass
|
|
|
|
# 2. Fill Details
|
|
start_idx = page_idx * rows_per_page
|
|
end_idx = start_idx + rows_per_page
|
|
page_scans = scans[start_idx:end_idx]
|
|
|
|
for i, scan in enumerate(page_scans):
|
|
target_row = detail_start_row + (page_idx * page_height) + i
|
|
for field in detail_fields:
|
|
if field['excel_cell']:
|
|
try:
|
|
col_letter = field['excel_cell'].upper().strip()
|
|
cell_ref = f"{col_letter}{target_row}"
|
|
value = scan[field['field_name']]
|
|
if field['field_type'] == 'REAL' and value: value = float(value)
|
|
elif field['field_type'] == 'INTEGER' and value: value = int(value)
|
|
ws[cell_ref] = value
|
|
except: pass
|
|
|
|
# 3. Force Page Break (BEFORE the new header)
|
|
if page_idx < total_pages - 1:
|
|
next_page_start_row = ((page_idx + 1) * page_height) # No +1 here!
|
|
ws.row_breaks.append(Break(id=next_page_start_row))
|
|
|
|
# --- STEP 3: CLEANUP (Hide Unused Rows) ---
|
|
last_used_row = (total_pages * page_height)
|
|
SAFE_MAX_ROW = 5000
|
|
|
|
for row_num in range(last_used_row + 1, SAFE_MAX_ROW):
|
|
ws.row_dimensions[row_num].hidden = True
|
|
|
|
# --- FINAL POLISH (Manual Widths) ---
|
|
|
|
# --- FIX 2: Use bracket notation (sess['col']) instead of .get() ---
|
|
# We use 'or' to provide defaults if the DB value is None
|
|
start_col = sess['print_start_col'] or 'A'
|
|
|
|
if sess['print_end_col']:
|
|
end_col = sess['print_end_col']
|
|
else:
|
|
# Fallback to auto-detection if user left it blank
|
|
end_col = get_column_letter(ws.max_column)
|
|
|
|
# Set Print Area
|
|
ws.print_area = f"{start_col}1:{end_col}{last_used_row}"
|
|
|
|
if ws.sheet_properties.pageSetUpPr:
|
|
ws.sheet_properties.pageSetUpPr.fitToPage = False
|
|
|
|
# Save
|
|
output = BytesIO()
|
|
wb.save(output)
|
|
output.seek(0)
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
base_filename = f"{sess['process_key']}_{session_id}_{timestamp}"
|
|
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
|
headers={'Content-Disposition': f'attachment; filename={base_filename}.xlsx'}
|
|
) |