You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

264 lines
10 KiB

from __future__ import annotations
from decimal import Decimal
from flask import Blueprint, flash, g, redirect, render_template, request, url_for
from app.auth import role_required
from app.extensions import db
from app.models import Account
from app.services import (
build_new_household_draft,
get_household_or_none,
is_head_name_taken,
list_enabled_options,
list_households,
normalize_search_term,
parse_entry_form,
serialize_admin_edit_snapshot,
serialize_entry_edit_snapshot,
write_audit_log,
)
quick_entry_bp = Blueprint("quick_entry", __name__, url_prefix="/quick-entry")
def _include_bride_side_param(source: object) -> bool:
if not hasattr(source, "get"):
return False
value = getattr(source, "get")("include_bride_side", "")
if not isinstance(value, str):
return False
return value.strip() == "1"
def _quick_entry_search_url(*, search_term: str = "", include_bride_side: bool = False) -> str:
params: dict[str, str] = {}
normalized_search_term = search_term.strip()
if normalized_search_term:
params["q"] = normalized_search_term
if include_bride_side:
params["include_bride_side"] = "1"
return url_for("quick_entry.index", **params)
def _option_id_by_code(options: list[object], code: str) -> int | None:
for option in options:
if getattr(option, "option_code", None) == code:
option_id = getattr(option, "id", None)
if isinstance(option_id, int):
return option_id
return None
def _quick_entry_index_context() -> dict[str, object]:
search_term = request.args.get("q", "").strip()
include_bride_side = _include_bride_side_param(request.args)
households = list_households(search_term=search_term, limit=20) if normalize_search_term(search_term) else []
if not include_bride_side:
households = [household for household in households if household.side != "bride_side"]
households = sorted(
households,
key=lambda household: household.total_gift_amount > Decimal("0.00"),
)
return {
"search_term": search_term,
"include_bride_side": include_bride_side,
"households": households,
}
def _is_partial_request(expected_partial: str) -> bool:
requested_partial = request.args.get("partial", "").strip()
header_partial = request.headers.get("X-HW-Partial", "").strip()
return requested_partial == expected_partial or header_partial == expected_partial
@quick_entry_bp.get("/")
@role_required("admin", "editor", "entry_only", "quick_editor")
def index() -> str:
context = _quick_entry_index_context()
if _is_partial_request("search-results"):
return render_template("quick_entry/_search_results.html", **context)
return render_template("quick_entry/index.html", **context)
@quick_entry_bp.get("/households/new")
@role_required("admin", "editor", "entry_only", "quick_editor")
def new_household() -> str:
search_term = request.args.get("q", "").strip()
include_bride_side = _include_bride_side_param(request.args)
gift_method_options = list_enabled_options("gift_method")
gift_scene_options = list_enabled_options("gift_scene")
return render_template(
"quick_entry/new.html",
search_term=search_term,
include_bride_side=include_bride_side,
default_gift_method_id=_option_id_by_code(gift_method_options, "cash"),
default_gift_scene_id=_option_id_by_code(gift_scene_options, "wedding_day"),
)
@quick_entry_bp.get("/households/<int:household_id>/edit")
@role_required("admin", "editor", "entry_only", "quick_editor")
def edit_household(household_id: int):
household = get_household_or_none(household_id)
if household is None:
flash("未找到要修改的户信息。", "error")
return redirect(url_for("quick_entry.index"))
gift_method_options = list_enabled_options("gift_method")
gift_scene_options = list_enabled_options("gift_scene")
context = {
"household": household,
"search_term": request.args.get("q", "").strip(),
"include_bride_side": _include_bride_side_param(request.args),
"gift_method_options": gift_method_options,
"gift_scene_options": gift_scene_options,
"default_gift_method_id": _option_id_by_code(gift_method_options, "cash"),
"default_gift_scene_id": _option_id_by_code(gift_scene_options, "wedding_day"),
}
if _is_partial_request("edit-modal"):
return render_template("quick_entry/_edit_modal.html", **context)
return render_template("quick_entry/edit.html", **context)
@quick_entry_bp.post("/households/<int:household_id>/edit")
@role_required("admin", "editor", "entry_only", "quick_editor")
def update_household(household_id: int):
household = get_household_or_none(household_id)
if household is None:
flash("未找到要修改的户信息。", "error")
return redirect(url_for("quick_entry.index"))
search_term = request.form.get("q", "").strip()
include_bride_side = _include_bride_side_param(request.form)
redirect_to_search = _quick_entry_search_url(search_term=search_term, include_bride_side=include_bride_side)
redirect_to_self = url_for(
"quick_entry.edit_household",
household_id=household_id,
q=search_term,
**({"include_bride_side": "1"} if include_bride_side else {}),
)
submitted_version = request.form.get("version", "").strip()
try:
version = int(submitted_version)
except ValueError:
flash("表单版本无效,请重新打开该户信息后再试。", "error")
return redirect(redirect_to_self)
if version != household.version:
flash("该户信息已被其他人更新,请刷新后重新确认再保存。", "warning")
return redirect(redirect_to_self)
gift_method_options = list_enabled_options("gift_method")
gift_scene_options = list_enabled_options("gift_scene")
payload, errors = parse_entry_form(
dict(request.form),
valid_method_ids={option.id for option in gift_method_options},
valid_scene_ids={option.id for option in gift_scene_options},
)
head_name = request.form.get("head_name", "").strip()
if not head_name:
errors.append("户主姓名不能为空。")
elif is_head_name_taken(head_name, excluding_household_id=household_id):
errors.append("该户主姓名已存在,请换一个名字或直接录入现有户。")
if errors:
for error in errors:
flash(error, "error")
return redirect(redirect_to_self)
before_snapshot = serialize_entry_edit_snapshot(household)
household.head_name = head_name
for field_name, value in payload.items():
setattr(household, field_name, value)
current_account = g.current_account
if isinstance(current_account, Account):
household.updated_by = current_account.id
if serialize_entry_edit_snapshot(household) == before_snapshot:
flash("没有检测到需要保存的变化。", "warning")
db.session.rollback()
return redirect(redirect_to_search)
household.version += 1
after_snapshot = serialize_entry_edit_snapshot(household)
write_audit_log(
action_type="update_household_entry",
target_type="household",
actor=current_account if isinstance(current_account, Account) else None,
target_id=household.id,
target_display_name=household.head_name,
before_data=before_snapshot,
after_data=after_snapshot,
commit=False,
)
db.session.commit()
flash(f"已保存 {household.head_name} 的礼金信息。", "success")
return redirect(redirect_to_search)
@quick_entry_bp.post("/households")
@role_required("admin", "editor", "entry_only", "quick_editor")
def create_household():
search_term = request.form.get("q", "").strip() or request.form.get("head_name", "").strip()
include_bride_side = _include_bride_side_param(request.form)
redirect_to_search = _quick_entry_search_url(search_term=search_term, include_bride_side=include_bride_side)
redirect_to_self = url_for(
"quick_entry.new_household",
q=search_term,
**({"include_bride_side": "1"} if include_bride_side else {}),
)
gift_method_options = list_enabled_options("gift_method")
gift_scene_options = list_enabled_options("gift_scene")
payload, errors = parse_entry_form(
dict(request.form),
valid_method_ids={option.id for option in gift_method_options},
valid_scene_ids={option.id for option in gift_scene_options},
)
if errors:
for error in errors:
flash(error, "error")
return redirect(redirect_to_self)
head_name = request.form.get("head_name", "").strip()
if not head_name:
flash("户主姓名不能为空。", "error")
return redirect(redirect_to_self)
if is_head_name_taken(head_name):
flash("该户主姓名已存在,请先搜索现有户后直接录入礼金。", "error")
return redirect(redirect_to_self)
household = build_new_household_draft()
household.head_name = head_name
household.note = request.form.get("note", "").strip() or None
for field_name, value in payload.items():
setattr(household, field_name, value)
if household.actual_attendee_count > 0 and household.attendance_status == "pending":
household.attendance_status = "attending"
current_account = g.current_account
if isinstance(current_account, Account):
household.created_by = current_account.id
household.updated_by = current_account.id
db.session.add(household)
db.session.flush()
write_audit_log(
action_type="create_household_quick_entry",
target_type="household",
actor=current_account if isinstance(current_account, Account) else None,
target_id=household.id,
target_display_name=household.head_name,
after_data=serialize_admin_edit_snapshot(household),
commit=False,
)
db.session.commit()
flash(f"已快速创建 {household.head_name} 并记录礼金。", "success")
return redirect(redirect_to_search)