v0.13.0 - Add Consumption Sheets module

New module for tracking production consumption with lot scanning:
- Admin configuration for process types (AD WIP, etc.)
- Dynamic table creation per process
- Flexible header/detail field definitions with Excel cell mapping
- Duplicate detection with configurable key field
- Staff scanning interface with duplicate warnings (same session/cross session)
- Excel export using uploaded templates with multi-page support
- Template settings for rows per page and detail start row
This commit is contained in:
Javier
2026-01-29 12:33:34 -06:00
parent b11421a8f5
commit d955a13f3d
11 changed files with 697 additions and 402 deletions

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, session
from db import query_db, execute_db
from db import query_db, execute_db, get_db
from utils import role_required
cons_sheets_bp = Blueprint('cons_sheets', __name__)
@@ -49,12 +49,101 @@ def create_process():
VALUES (?, ?, ?)
''', [process_key, process_name, session['user_id']])
# Create dynamic detail table for this process
create_process_detail_table(process_key)
flash(f'Process "{process_name}" created successfully!', 'success')
return redirect(url_for('cons_sheets.process_detail', process_id=process_id))
return render_template('cons_sheets/create_process.html')
import sqlite3
import os
def get_db_path():
"""Get the database path"""
db_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'database', 'scanlook.db')
print(f"DEBUG: Database path is: {db_path}")
print(f"DEBUG: Path exists: {os.path.exists(db_path)}")
return db_path
def create_process_detail_table(process_key):
"""Create the dynamic detail table for a process with system columns"""
table_name = f'cons_proc_{process_key}_details'
print(f"DEBUG: Creating table {table_name}")
try:
db_path = get_db_path()
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
scanned_by INTEGER NOT NULL,
scanned_at DATETIME DEFAULT CURRENT_TIMESTAMP,
duplicate_status TEXT DEFAULT 'normal' CHECK(duplicate_status IN ('normal', 'dup_same_session', 'dup_other_session')),
duplicate_info TEXT,
comment TEXT,
is_deleted INTEGER DEFAULT 0,
FOREIGN KEY (session_id) REFERENCES cons_sessions(id),
FOREIGN KEY (scanned_by) REFERENCES Users(user_id)
)
''')
# Create index on session_id
cursor.execute(f'CREATE INDEX IF NOT EXISTS idx_{process_key}_session ON {table_name}(session_id, is_deleted)')
conn.commit()
conn.close()
print(f"DEBUG: Table {table_name} created successfully!")
except Exception as e:
print(f"ERROR creating table {table_name}: {e}")
def add_column_to_detail_table(process_key, field_name, field_type):
"""Add a column to the process detail table"""
table_name = f'cons_proc_{process_key}_details'
# Map field types to SQLite types
sqlite_type = 'TEXT'
if field_type == 'INTEGER':
sqlite_type = 'INTEGER'
elif field_type == 'REAL':
sqlite_type = 'REAL'
conn = sqlite3.connect(get_db_path())
cursor = conn.cursor()
try:
cursor.execute(f'ALTER TABLE {table_name} ADD COLUMN {field_name} {sqlite_type}')
conn.commit()
except Exception as e:
# Column might already exist
print(f"Note: Could not add column {field_name}: {e}")
finally:
conn.close()
def rename_column_in_detail_table(process_key, old_name, new_name):
"""Rename a column (for soft delete)"""
table_name = f'cons_proc_{process_key}_details'
conn = sqlite3.connect(get_db_path())
cursor = conn.cursor()
try:
cursor.execute(f'ALTER TABLE {table_name} RENAME COLUMN {old_name} TO {new_name}')
conn.commit()
except Exception as e:
print(f"Note: Could not rename column {old_name}: {e}")
finally:
conn.close()
@cons_sheets_bp.route('/admin/consumption-sheets/<int:process_id>')
@role_required('owner', 'admin')
def process_detail(process_id):
@@ -278,13 +367,20 @@ def add_field(process_id, table_type):
''', [process_id, table_type], one=True)
sort_order = (max_sort['max_sort'] or 0) + 1
# For detail fields, check for duplicate key checkbox
is_duplicate_key = 1 if (table_type == 'detail' and request.form.get('is_duplicate_key')) else 0
# Insert the field
execute_db('''
INSERT INTO cons_process_fields
(process_id, table_type, field_name, field_label, field_type, max_length, is_required, sort_order, excel_cell)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
(process_id, table_type, field_name, field_label, field_type, max_length, is_required, is_duplicate_key, sort_order, excel_cell)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', [process_id, table_type, field_name, field_label, field_type,
int(max_length) if max_length else None, is_required, sort_order, excel_cell or None])
int(max_length) if max_length else None, is_required, is_duplicate_key, sort_order, excel_cell or None])
# For detail fields, also add the column to the dynamic table
if table_type == 'detail':
add_column_to_detail_table(process['process_key'], field_name, field_type)
flash(f'Field "{field_label}" added successfully!', 'success')
return redirect(url_for('cons_sheets.process_fields', process_id=process_id))
@@ -310,6 +406,7 @@ def edit_field(process_id, field_id):
field_type = request.form.get('field_type', 'TEXT')
max_length = request.form.get('max_length', '')
is_required = 1 if request.form.get('is_required') else 0
is_duplicate_key = 1 if (field['table_type'] == 'detail' and request.form.get('is_duplicate_key')) else 0
excel_cell = request.form.get('excel_cell', '').strip().upper()
if not field_label:
@@ -318,9 +415,9 @@ def edit_field(process_id, field_id):
execute_db('''
UPDATE cons_process_fields
SET field_label = ?, field_type = ?, max_length = ?, is_required = ?, excel_cell = ?
SET field_label = ?, field_type = ?, max_length = ?, is_required = ?, is_duplicate_key = ?, excel_cell = ?
WHERE id = ?
''', [field_label, field_type, int(max_length) if max_length else None, is_required, excel_cell or None, field_id])
''', [field_label, field_type, int(max_length) if max_length else None, is_required, is_duplicate_key, excel_cell or None, field_id])
flash(f'Field "{field_label}" updated successfully!', 'success')
return redirect(url_for('cons_sheets.process_fields', process_id=process_id))
@@ -335,13 +432,20 @@ def edit_field(process_id, field_id):
def delete_field(process_id, field_id):
"""Soft-delete a field (rename column, set is_active = 0)"""
field = query_db('SELECT * FROM cons_process_fields WHERE id = ? AND process_id = ?', [field_id, process_id], one=True)
process = query_db('SELECT * FROM cons_processes WHERE id = ?', [process_id], one=True)
if not field:
if not field or not process:
return jsonify({'success': False, 'message': 'Field not found'})
# Soft delete: set is_active = 0
execute_db('UPDATE cons_process_fields SET is_active = 0 WHERE id = ?', [field_id])
# For detail fields, rename the column to preserve data
if field['table_type'] == 'detail':
old_name = field['field_name']
new_name = f"Del_{field_id}_{old_name}"
rename_column_in_detail_table(process['process_key'], old_name, new_name)
return jsonify({'success': True, 'message': f'Field "{field["field_label"]}" deleted'})
@@ -351,6 +455,20 @@ def delete_field(process_id, field_id):
from utils import login_required
def get_detail_table_name(process_key):
"""Get the dynamic detail table name for a process"""
return f'cons_proc_{process_key}_details'
def get_duplicate_key_field(process_id):
"""Get the field marked as duplicate key for a process"""
return query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_duplicate_key = 1 AND is_active = 1
LIMIT 1
''', [process_id], one=True)
@cons_sheets_bp.route('/cons-sheets')
@login_required
def index():
@@ -368,23 +486,38 @@ def index():
flash('You do not have access to this module', 'danger')
return redirect(url_for('home'))
# Get user's active sessions with process info
# Get user's active sessions with process info and scan counts
active_sessions = query_db('''
SELECT cs.*, cp.process_name, cp.process_key,
(SELECT COUNT(*) FROM cons_session_details WHERE session_id = cs.id AND is_deleted = 0) as scan_count
SELECT cs.*, cp.process_name, cp.process_key
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.created_by = ? AND cs.status = 'active'
ORDER BY cs.created_at DESC
''', [user_id])
# Get scan counts for each session from their dynamic tables
sessions_with_counts = []
for sess in active_sessions:
table_name = get_detail_table_name(sess['process_key'])
try:
count_result = query_db(f'''
SELECT COUNT(*) as scan_count FROM {table_name}
WHERE session_id = ? AND is_deleted = 0
''', [sess['id']], one=True)
sess_dict = dict(sess)
sess_dict['scan_count'] = count_result['scan_count'] if count_result else 0
except:
sess_dict = dict(sess)
sess_dict['scan_count'] = 0
sessions_with_counts.append(sess_dict)
# Get available process types for creating new sessions
processes = query_db('''
SELECT * FROM cons_processes WHERE is_active = 1 ORDER BY process_name
''')
return render_template('cons_sheets/staff_index.html',
sessions=active_sessions,
sessions=sessions_with_counts,
processes=processes)
@@ -474,79 +607,90 @@ def scan_session(session_id):
ORDER BY cpf.sort_order, cpf.id
''', [session_id])
# Get scanned details
scans = query_db('''
SELECT csd.*, u.full_name as scanned_by_name
FROM cons_session_details csd
JOIN Users u ON csd.scanned_by = u.user_id
WHERE csd.session_id = ? AND csd.is_deleted = 0
ORDER BY csd.scanned_at DESC
''', [session_id])
# Get detail fields for reference
detail_fields = query_db('''
# Get detail fields for this process (convert to dicts for JSON serialization)
detail_fields_rows = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
ORDER BY sort_order, id
''', [sess['process_id']])
detail_fields = [dict(row) for row in detail_fields_rows] if detail_fields_rows else []
# Get scanned details from the dynamic table
table_name = get_detail_table_name(sess['process_key'])
scans = query_db(f'''
SELECT t.*, u.full_name as scanned_by_name
FROM {table_name} t
JOIN Users u ON t.scanned_by = u.user_id
WHERE t.session_id = ? AND t.is_deleted = 0
ORDER BY t.scanned_at DESC
''', [session_id])
# Get the duplicate key field (convert to dict for JSON)
dup_key_field_row = get_duplicate_key_field(sess['process_id'])
dup_key_field = dict(dup_key_field_row) if dup_key_field_row else None
return render_template('cons_sheets/scan_session.html',
session=sess,
header_values=header_values,
scans=scans,
detail_fields=detail_fields)
detail_fields=detail_fields,
dup_key_field=dup_key_field)
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>/scan', methods=['POST'])
@login_required
def scan_lot(session_id):
"""Process a lot scan with duplicate detection"""
sess = query_db('SELECT * FROM cons_sessions WHERE id = ? AND status = "active"', [session_id], one=True)
"""Process a scan with duplicate detection using dynamic tables"""
sess = query_db('''
SELECT cs.*, cp.process_key, cp.id as process_id
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.id = ? AND cs.status = 'active'
''', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found or archived'})
data = request.get_json()
lot_number = data.get('lot_number', '').strip()
item_number = data.get('item_number', '').strip()
weight = data.get('weight')
field_values = data.get('field_values', {}) # Dict of field_name: value
confirm_duplicate = data.get('confirm_duplicate', False)
check_only = data.get('check_only', False)
if not lot_number:
return jsonify({'success': False, 'message': 'Lot number required'})
# Get the duplicate key field
dup_key_field = get_duplicate_key_field(sess['process_id'])
if not check_only and weight is None:
return jsonify({'success': False, 'message': 'Weight required'})
if not dup_key_field:
return jsonify({'success': False, 'message': 'No duplicate key field configured for this process'})
if not check_only:
try:
weight = float(weight)
except (ValueError, TypeError):
return jsonify({'success': False, 'message': 'Invalid weight value'})
dup_key_value = field_values.get(dup_key_field['field_name'], '').strip()
if not dup_key_value:
return jsonify({'success': False, 'message': f'{dup_key_field["field_label"]} is required'})
table_name = get_detail_table_name(sess['process_key'])
# Check for duplicates in SAME session
same_session_dup = query_db('''
SELECT * FROM cons_session_details
WHERE session_id = ? AND lot_number = ? AND is_deleted = 0
''', [session_id, lot_number], one=True)
same_session_dup = query_db(f'''
SELECT * FROM {table_name}
WHERE session_id = ? AND {dup_key_field['field_name']} = ? AND is_deleted = 0
''', [session_id, dup_key_value], one=True)
# Check for duplicates in OTHER sessions (with header info for context)
other_session_dup = query_db('''
SELECT csd.*, cs.id as other_session_id, cs.created_at as other_session_date,
# Check for duplicates in OTHER sessions (need to check all sessions of same process type)
other_session_dup = query_db(f'''
SELECT t.*, cs.id as other_session_id, cs.created_at as other_session_date,
u.full_name as other_user,
(SELECT field_value FROM cons_session_header_values
WHERE session_id = cs.id AND field_id = (
SELECT id FROM cons_process_fields
WHERE process_id = cs.process_id AND field_name LIKE '%wo%' AND is_active = 1 LIMIT 1
)) as other_wo
FROM cons_session_details csd
JOIN cons_sessions cs ON csd.session_id = cs.id
JOIN Users u ON csd.scanned_by = u.user_id
WHERE csd.lot_number = ? AND csd.session_id != ? AND csd.is_deleted = 0
ORDER BY csd.scanned_at DESC
FROM {table_name} t
JOIN cons_sessions cs ON t.session_id = cs.id
JOIN Users u ON t.scanned_by = u.user_id
WHERE t.{dup_key_field['field_name']} = ? AND t.session_id != ? AND t.is_deleted = 0
ORDER BY t.scanned_at DESC
LIMIT 1
''', [lot_number, session_id], one=True)
''', [dup_key_value, session_id], one=True)
duplicate_status = 'normal'
duplicate_info = None
@@ -586,19 +730,36 @@ def scan_lot(session_id):
'message': duplicate_info
})
# Insert the scan
detail_id = execute_db('''
INSERT INTO cons_session_details
(session_id, item_number, lot_number, weight, scanned_by, duplicate_status, duplicate_info)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', [session_id, item_number, lot_number, weight, session['user_id'], duplicate_status, duplicate_info])
# Get all active detail fields for this process
detail_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
ORDER BY sort_order, id
''', [sess['process_id']])
# Build dynamic INSERT statement
field_names = ['session_id', 'scanned_by', 'duplicate_status', 'duplicate_info']
field_placeholders = ['?', '?', '?', '?']
values = [session_id, session['user_id'], duplicate_status, duplicate_info]
for field in detail_fields:
field_names.append(field['field_name'])
field_placeholders.append('?')
values.append(field_values.get(field['field_name'], ''))
insert_sql = f'''
INSERT INTO {table_name} ({', '.join(field_names)})
VALUES ({', '.join(field_placeholders)})
'''
detail_id = execute_db(insert_sql, values)
# If this is a same-session duplicate, update the original scan too
updated_entry_ids = []
if duplicate_status == 'dup_same_session' and same_session_dup:
execute_db('''
UPDATE cons_session_details
SET duplicate_status = 'dup_same_session', duplicate_info = 'Duplicate lot'
execute_db(f'''
UPDATE {table_name}
SET duplicate_status = 'dup_same_session', duplicate_info = 'Duplicate'
WHERE id = ?
''', [same_session_dup['id']])
updated_entry_ids.append(same_session_dup['id'])
@@ -611,16 +772,28 @@ def scan_lot(session_id):
})
@cons_sheets_bp.route('/cons-sheets/detail/<int:detail_id>')
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>/detail/<int:detail_id>')
@login_required
def get_detail(detail_id):
def get_detail(session_id, detail_id):
"""Get detail info for editing"""
detail = query_db('''
SELECT csd.*, u.full_name as scanned_by_name
FROM cons_session_details csd
JOIN Users u ON csd.scanned_by = u.user_id
WHERE csd.id = ?
''', [detail_id], one=True)
sess = query_db('''
SELECT cs.*, cp.process_key
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.id = ?
''', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found'})
table_name = get_detail_table_name(sess['process_key'])
detail = query_db(f'''
SELECT t.*, u.full_name as scanned_by_name
FROM {table_name} t
JOIN Users u ON t.scanned_by = u.user_id
WHERE t.id = ? AND t.session_id = ?
''', [detail_id, session_id], one=True)
if not detail:
return jsonify({'success': False, 'message': 'Detail not found'})
@@ -628,11 +801,23 @@ def get_detail(detail_id):
return jsonify({'success': True, 'detail': dict(detail)})
@cons_sheets_bp.route('/cons-sheets/detail/<int:detail_id>/update', methods=['POST'])
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>/detail/<int:detail_id>/update', methods=['POST'])
@login_required
def update_detail(detail_id):
def update_detail(session_id, detail_id):
"""Update a scanned detail"""
detail = query_db('SELECT * FROM cons_session_details WHERE id = ?', [detail_id], one=True)
sess = query_db('''
SELECT cs.*, cp.process_key, cp.id as process_id
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.id = ?
''', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found'})
table_name = get_detail_table_name(sess['process_key'])
detail = query_db(f'SELECT * FROM {table_name} WHERE id = ? AND session_id = ?', [detail_id, session_id], one=True)
if not detail:
return jsonify({'success': False, 'message': 'Detail not found'})
@@ -642,33 +827,54 @@ def update_detail(detail_id):
return jsonify({'success': False, 'message': 'Permission denied'})
data = request.get_json()
item_number = data.get('item_number', '').strip()
lot_number = data.get('lot_number', '').strip()
weight = data.get('weight')
field_values = data.get('field_values', {})
comment = data.get('comment', '')
if not lot_number:
return jsonify({'success': False, 'message': 'Lot number required'})
# Get all active detail fields for this process
detail_fields = query_db('''
SELECT * FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1
''', [sess['process_id']])
try:
weight = float(weight)
except (ValueError, TypeError):
return jsonify({'success': False, 'message': 'Invalid weight'})
# Build dynamic UPDATE statement
set_clauses = ['comment = ?']
values = [comment]
execute_db('''
UPDATE cons_session_details
SET item_number = ?, lot_number = ?, weight = ?, comment = ?
for field in detail_fields:
if field['field_name'] in field_values:
set_clauses.append(f"{field['field_name']} = ?")
values.append(field_values[field['field_name']])
values.append(detail_id)
update_sql = f'''
UPDATE {table_name}
SET {', '.join(set_clauses)}
WHERE id = ?
''', [item_number, lot_number, weight, comment, detail_id])
'''
execute_db(update_sql, values)
return jsonify({'success': True})
@cons_sheets_bp.route('/cons-sheets/detail/<int:detail_id>/delete', methods=['POST'])
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>/detail/<int:detail_id>/delete', methods=['POST'])
@login_required
def delete_detail(detail_id):
def delete_detail(session_id, detail_id):
"""Soft-delete a scanned detail"""
detail = query_db('SELECT * FROM cons_session_details WHERE id = ?', [detail_id], one=True)
sess = query_db('''
SELECT cs.*, cp.process_key
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.id = ?
''', [session_id], one=True)
if not sess:
return jsonify({'success': False, 'message': 'Session not found'})
table_name = get_detail_table_name(sess['process_key'])
detail = query_db(f'SELECT * FROM {table_name} WHERE id = ? AND session_id = ?', [detail_id, session_id], one=True)
if not detail:
return jsonify({'success': False, 'message': 'Detail not found'})
@@ -677,7 +883,7 @@ def delete_detail(detail_id):
if detail['scanned_by'] != session['user_id'] and session['role'] not in ['owner', 'admin']:
return jsonify({'success': False, 'message': 'Permission denied'})
execute_db('UPDATE cons_session_details SET is_deleted = 1 WHERE id = ?', [detail_id])
execute_db(f'UPDATE {table_name} SET is_deleted = 1 WHERE id = ?', [detail_id])
return jsonify({'success': True})
@@ -697,4 +903,218 @@ def archive_session(session_id):
execute_db('UPDATE cons_sessions SET status = "archived" WHERE id = ?', [session_id])
return jsonify({'success': True})
return jsonify({'success': True})
@cons_sheets_bp.route('/cons-sheets/session/<int:session_id>/export')
@login_required
def export_session(session_id):
"""Export session to Excel using the process template"""
from flask import Response
from io import BytesIO
import openpyxl
from openpyxl.utils import get_column_letter, column_index_from_string
from copy import copy
from datetime import datetime
# Get session with process info
sess = query_db('''
SELECT cs.*, cp.process_name, cp.process_key, cp.id as process_id,
cp.template_file, cp.template_filename, cp.rows_per_page, cp.detail_start_row
FROM cons_sessions cs
JOIN cons_processes cp ON cs.process_id = cp.id
WHERE cs.id = ?
''', [session_id], one=True)
if not sess:
flash('Session not found', 'danger')
return redirect(url_for('cons_sheets.index'))
if not sess['template_file']:
flash('No template configured for this process', 'danger')
return redirect(url_for('cons_sheets.scan_session', session_id=session_id))
# Get header fields and values
header_fields = query_db('''
SELECT cpf.field_name, cpf.excel_cell, cshv.field_value
FROM cons_process_fields cpf
LEFT JOIN cons_session_header_values cshv ON cpf.id = cshv.field_id AND cshv.session_id = ?
WHERE cpf.process_id = ? AND cpf.table_type = 'header' AND cpf.is_active = 1 AND cpf.excel_cell IS NOT NULL
''', [session_id, sess['process_id']])
# Get detail fields with their column mappings
detail_fields = query_db('''
SELECT field_name, excel_cell, field_type
FROM cons_process_fields
WHERE process_id = ? AND table_type = 'detail' AND is_active = 1 AND excel_cell IS NOT NULL
ORDER BY sort_order, id
''', [sess['process_id']])
# Get all scanned details
table_name = get_detail_table_name(sess['process_key'])
scans = query_db(f'''
SELECT * FROM {table_name}
WHERE session_id = ? AND is_deleted = 0
ORDER BY scanned_at ASC
''', [session_id])
# Load the template
template_bytes = BytesIO(sess['template_file'])
wb = openpyxl.load_workbook(template_bytes)
ws = wb.active
rows_per_page = sess['rows_per_page'] or 30
detail_start_row = sess['detail_start_row'] or 11
# Calculate how many pages we need
total_scans = len(scans) if scans else 0
num_pages = max(1, (total_scans + rows_per_page - 1) // rows_per_page) if total_scans > 0 else 1
# Helper function to fill header values on a sheet
def fill_header(worksheet, header_fields):
for field in header_fields:
if field['excel_cell'] and field['field_value']:
try:
worksheet[field['excel_cell']] = field['field_value']
except:
pass # Skip invalid cell references
# Helper function to clear detail rows on a sheet
def clear_details(worksheet, detail_fields, start_row, num_rows):
for i in range(num_rows):
row_num = start_row + i
for field in detail_fields:
if field['excel_cell']:
try:
col_letter = field['excel_cell'].upper().strip()
cell_ref = f"{col_letter}{row_num}"
worksheet[cell_ref] = None
except:
pass
# Helper function to fill detail rows on a sheet
def fill_details(worksheet, scans_subset, detail_fields, start_row):
for i, scan in enumerate(scans_subset):
row_num = start_row + i
for field in detail_fields:
if field['excel_cell']:
try:
col_letter = field['excel_cell'].upper().strip()
cell_ref = f"{col_letter}{row_num}"
value = scan[field['field_name']]
# Convert to appropriate type
if field['field_type'] == 'REAL' and value:
value = float(value)
elif field['field_type'] == 'INTEGER' and value:
value = int(value)
worksheet[cell_ref] = value
except Exception as e:
print(f"Error filling cell: {e}")
# Fill the first page
fill_header(ws, header_fields)
first_page_scans = scans[:rows_per_page] if scans else []
fill_details(ws, first_page_scans, detail_fields, detail_start_row)
# Create additional pages if needed
for page_num in range(2, num_pages + 1):
# Copy the worksheet within the same workbook
new_ws = wb.copy_worksheet(ws)
new_ws.title = f"Page {page_num}"
# Clear detail rows (they have Page 1 data)
clear_details(new_ws, detail_fields, detail_start_row, rows_per_page)
# Fill details for this page
start_idx = (page_num - 1) * rows_per_page
end_idx = start_idx + rows_per_page
page_scans = scans[start_idx:end_idx]
fill_details(new_ws, page_scans, detail_fields, detail_start_row)
# Rename first sheet if we have multiple pages
if num_pages > 1:
ws.title = "Page 1"
# Save to BytesIO
output = BytesIO()
wb.save(output)
output.seek(0)
# Generate filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
base_filename = f"{sess['process_key']}_{session_id}_{timestamp}"
# Check if PDF export is requested
export_format = request.args.get('format', 'xlsx')
print(f"DEBUG: Export format requested: {export_format}")
if export_format == 'pdf':
# Use win32com to convert to PDF (requires Excel installed)
try:
import tempfile
import pythoncom
import win32com.client as win32
print("DEBUG: pywin32 imported successfully")
# Save Excel to temp file
temp_xlsx = tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False)
temp_xlsx.write(output.getvalue())
temp_xlsx.close()
print(f"DEBUG: Temp Excel saved to: {temp_xlsx.name}")
temp_pdf = temp_xlsx.name.replace('.xlsx', '.pdf')
# Initialize COM for this thread
pythoncom.CoInitialize()
print("DEBUG: COM initialized")
try:
excel = win32.Dispatch('Excel.Application')
excel.Visible = False
excel.DisplayAlerts = False
print("DEBUG: Excel application started")
workbook = excel.Workbooks.Open(temp_xlsx.name)
print("DEBUG: Workbook opened")
workbook.ExportAsFixedFormat(0, temp_pdf) # 0 = PDF format
print(f"DEBUG: Exported to PDF: {temp_pdf}")
workbook.Close(False)
excel.Quit()
print("DEBUG: Excel closed")
finally:
pythoncom.CoUninitialize()
# Read the PDF
with open(temp_pdf, 'rb') as f:
pdf_data = f.read()
print(f"DEBUG: PDF read, size: {len(pdf_data)} bytes")
# Clean up temp files
import os
os.unlink(temp_xlsx.name)
os.unlink(temp_pdf)
print("DEBUG: Temp files cleaned up")
return Response(
pdf_data,
mimetype='application/pdf',
headers={'Content-Disposition': f'attachment; filename={base_filename}.pdf'}
)
except ImportError as e:
print(f"ERROR: Import failed - {e}")
# Fall back to Excel export
except Exception as e:
print(f"ERROR: PDF export failed - {e}")
import traceback
traceback.print_exc()
# Fall back to Excel export
# Default: return Excel file
print("DEBUG: Returning Excel file")
return Response(
output.getvalue(),
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
headers={'Content-Disposition': f'attachment; filename={base_filename}.xlsx'}
)