You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
954 lines
33 KiB
954 lines
33 KiB
from __future__ import annotations
|
|
|
|
from collections.abc import Sequence
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from decimal import Decimal, InvalidOperation
|
|
import re
|
|
|
|
from sqlalchemy import Select, func
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
try:
|
|
from pypinyin import Style, lazy_pinyin
|
|
except ImportError: # pragma: no cover - dependency is installed in runtime verification
|
|
Style = None
|
|
|
|
def _lazy_pinyin_fallback(value: str, style: object | None = None) -> list[str]:
|
|
_ = style
|
|
return [value]
|
|
|
|
lazy_pinyin = _lazy_pinyin_fallback
|
|
|
|
from app.extensions import db
|
|
from app.models import Household, OptionItem
|
|
|
|
ATTENDANCE_STATUS_OPTIONS = ("pending", "attending", "absent", "partial")
|
|
INVITE_STATUS_OPTIONS = ("not_invited", "invited", "confirmed", "declined")
|
|
GIVEN_STATUS_OPTIONS = ("not_given", "given")
|
|
CHILD_RED_PACKET_STATUS_OPTIONS = ("not_given", "partial", "given")
|
|
SIDE_OPTIONS = ("groom_side", "bride_side", "both_side")
|
|
SORT_FIELD_OPTIONS = (
|
|
"updated_at",
|
|
"head_name",
|
|
"expected_attendee_count",
|
|
"actual_attendee_count",
|
|
"child_count",
|
|
"total_gift_amount",
|
|
)
|
|
SORT_ORDER_OPTIONS = ("asc", "desc")
|
|
HOMEPAGE_PER_PAGE_OPTIONS = (10, 20, 50)
|
|
DEFAULT_HOMEPAGE_PER_PAGE = 10
|
|
|
|
SIDE_LABELS = {
|
|
"groom_side": "男方",
|
|
"bride_side": "女方",
|
|
"both_side": "双方共有",
|
|
}
|
|
INVITE_STATUS_LABELS = {
|
|
"not_invited": "未邀请",
|
|
"invited": "已邀请",
|
|
"confirmed": "已确认",
|
|
"declined": "已婉拒",
|
|
}
|
|
ATTENDANCE_STATUS_LABELS = {
|
|
"pending": "待确认",
|
|
"attending": "到场",
|
|
"absent": "缺席",
|
|
"partial": "部分到场",
|
|
}
|
|
GIVEN_STATUS_LABELS = {
|
|
"not_given": "未发",
|
|
"given": "已发",
|
|
}
|
|
CHILD_RED_PACKET_STATUS_LABELS = {
|
|
"not_given": "未发",
|
|
"partial": "部分发放",
|
|
"given": "已发",
|
|
}
|
|
HOUSEHOLD_VALUE_LABELS = {
|
|
"side": SIDE_LABELS,
|
|
"invite_status": INVITE_STATUS_LABELS,
|
|
"attendance_status": ATTENDANCE_STATUS_LABELS,
|
|
"favor_status": GIVEN_STATUS_LABELS,
|
|
"candy_status": GIVEN_STATUS_LABELS,
|
|
"child_red_packet_status": CHILD_RED_PACKET_STATUS_LABELS,
|
|
}
|
|
SORT_FIELD_LABELS = {
|
|
"updated_at": "最近更新时间",
|
|
"head_name": "户主姓名",
|
|
"expected_attendee_count": "预计人数",
|
|
"actual_attendee_count": "实际到场人数",
|
|
"child_count": "儿童人数",
|
|
"total_gift_amount": "礼金金额",
|
|
}
|
|
|
|
ADMIN_EDITABLE_FIELDS = (
|
|
"household_code",
|
|
"head_name",
|
|
"phone",
|
|
"side",
|
|
"relation_category_option_id",
|
|
"relation_detail_option_id",
|
|
"tag_option_ids_json",
|
|
"invite_status",
|
|
"attendance_status",
|
|
"expected_attendee_count",
|
|
"actual_attendee_count",
|
|
"child_count",
|
|
"red_packet_child_count",
|
|
"total_gift_amount",
|
|
"gift_method_option_id",
|
|
"gift_scene_option_id",
|
|
"favor_status",
|
|
"candy_status",
|
|
"child_red_packet_status",
|
|
"note",
|
|
)
|
|
ENTRY_EDITABLE_FIELDS = (
|
|
"head_name",
|
|
"phone",
|
|
"attendance_status",
|
|
"actual_attendee_count",
|
|
"child_count",
|
|
"red_packet_child_count",
|
|
"total_gift_amount",
|
|
"gift_method_option_id",
|
|
"gift_scene_option_id",
|
|
"favor_status",
|
|
"candy_status",
|
|
"child_red_packet_status",
|
|
"note",
|
|
)
|
|
PHONE_PATTERN = re.compile(r"^[0-9+\-\s]{6,32}$")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class HouseholdHomepagePage:
|
|
items: Sequence[Household]
|
|
filtered_households: Sequence[Household]
|
|
filtered_count: int
|
|
page: int
|
|
per_page: int
|
|
total_pages: int
|
|
has_prev: bool
|
|
has_next: bool
|
|
page_start: int
|
|
page_end: int
|
|
|
|
|
|
def household_query(
|
|
_search_term: str | None = None,
|
|
*,
|
|
side: str | None = None,
|
|
invite_status: str | None = None,
|
|
attendance_status: str | None = None,
|
|
tag_option_id: int | None = None,
|
|
sort_by: str | None = None,
|
|
sort_order: str | None = None,
|
|
) -> Select[tuple[Household]]:
|
|
query = (
|
|
db.select(Household)
|
|
.where(Household.deleted_at.is_(None))
|
|
.options(
|
|
selectinload(Household.relation_category_option),
|
|
selectinload(Household.relation_detail_option),
|
|
selectinload(Household.gift_method_option),
|
|
selectinload(Household.gift_scene_option),
|
|
selectinload(Household.members),
|
|
selectinload(Household.updated_by_account),
|
|
)
|
|
)
|
|
|
|
normalized_side = (side or "").strip()
|
|
if normalized_side in SIDE_OPTIONS:
|
|
query = query.where(Household.side == normalized_side)
|
|
|
|
normalized_invite_status = (invite_status or "").strip()
|
|
if normalized_invite_status in INVITE_STATUS_OPTIONS:
|
|
query = query.where(Household.invite_status == normalized_invite_status)
|
|
|
|
normalized_attendance_status = (attendance_status or "").strip()
|
|
if normalized_attendance_status in ATTENDANCE_STATUS_OPTIONS:
|
|
query = query.where(Household.attendance_status == normalized_attendance_status)
|
|
|
|
if tag_option_id is not None:
|
|
query = query.where(Household.tag_option_ids_json.is_not(None))
|
|
|
|
query = _apply_database_sort(query, sort_by=sort_by, sort_order=sort_order)
|
|
|
|
return query
|
|
|
|
|
|
def list_households(
|
|
*,
|
|
search_term: str | None = None,
|
|
side: str | None = None,
|
|
invite_status: str | None = None,
|
|
attendance_status: str | None = None,
|
|
tag_option_id: int | None = None,
|
|
sort_by: str | None = None,
|
|
sort_order: str | None = None,
|
|
limit: int | None = 50,
|
|
) -> Sequence[Household]:
|
|
result = db.session.execute(
|
|
household_query(
|
|
search_term,
|
|
side=side,
|
|
invite_status=invite_status,
|
|
attendance_status=attendance_status,
|
|
tag_option_id=tag_option_id,
|
|
sort_by=sort_by,
|
|
sort_order=sort_order,
|
|
),
|
|
)
|
|
households = result.scalars().all()
|
|
|
|
normalized_term = normalize_search_term(search_term)
|
|
if normalized_term:
|
|
households = [household for household in households if _household_matches_search(household, normalized_term)]
|
|
if tag_option_id is not None:
|
|
households = [household for household in households if tag_option_id in (household.tag_option_ids_json or [])]
|
|
|
|
if not _database_sort_covers(sort_by):
|
|
households = _sort_households(households, sort_by=sort_by, sort_order=sort_order)
|
|
if limit is None:
|
|
return households
|
|
|
|
return households[:limit]
|
|
|
|
|
|
def paginate_households_for_homepage(
|
|
*,
|
|
search_term: str | None = None,
|
|
side: str | None = None,
|
|
invite_status: str | None = None,
|
|
attendance_status: str | None = None,
|
|
tag_option_id: int | None = None,
|
|
sort_by: str | None = None,
|
|
sort_order: str | None = None,
|
|
page: int = 1,
|
|
per_page: int = DEFAULT_HOMEPAGE_PER_PAGE,
|
|
) -> HouseholdHomepagePage:
|
|
normalized_per_page = per_page if per_page in HOMEPAGE_PER_PAGE_OPTIONS else DEFAULT_HOMEPAGE_PER_PAGE
|
|
filtered_households = list_households(
|
|
search_term=search_term,
|
|
side=side,
|
|
invite_status=invite_status,
|
|
attendance_status=attendance_status,
|
|
tag_option_id=tag_option_id,
|
|
sort_by=sort_by,
|
|
sort_order=sort_order,
|
|
limit=None,
|
|
)
|
|
filtered_count = len(filtered_households)
|
|
total_pages = max(1, (filtered_count + normalized_per_page - 1) // normalized_per_page)
|
|
normalized_page = min(max(page, 1), total_pages)
|
|
start_index = (normalized_page - 1) * normalized_per_page
|
|
end_index = start_index + normalized_per_page
|
|
page_items = filtered_households[start_index:end_index]
|
|
page_start = start_index + 1 if filtered_count else 0
|
|
page_end = min(end_index, filtered_count) if filtered_count else 0
|
|
|
|
return HouseholdHomepagePage(
|
|
items=page_items,
|
|
filtered_households=filtered_households,
|
|
filtered_count=filtered_count,
|
|
page=normalized_page,
|
|
per_page=normalized_per_page,
|
|
total_pages=total_pages,
|
|
has_prev=normalized_page > 1,
|
|
has_next=normalized_page < total_pages,
|
|
page_start=page_start,
|
|
page_end=page_end,
|
|
)
|
|
|
|
|
|
def get_household_or_none(household_id: int) -> Household | None:
|
|
result = db.session.execute(
|
|
household_query().where(Household.id == household_id),
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
def get_homepage_stats() -> dict[str, int | Decimal]:
|
|
summary = db.session.execute(
|
|
db.select(
|
|
func.count(Household.id),
|
|
func.coalesce(func.sum(Household.expected_attendee_count), 0),
|
|
func.coalesce(func.sum(Household.child_count), 0),
|
|
func.coalesce(func.sum(Household.total_gift_amount), Decimal("0.00")),
|
|
).where(Household.deleted_at.is_(None)),
|
|
).one()
|
|
|
|
return {
|
|
"total_households": int(summary[0] or 0),
|
|
"total_expected_attendees": int(summary[1] or 0),
|
|
"total_children": int(summary[2] or 0),
|
|
"total_gift_amount": summary[3] or Decimal("0.00"),
|
|
}
|
|
|
|
|
|
def get_filtered_homepage_stats(households: Sequence[Household]) -> dict[str, int | Decimal]:
|
|
total_expected_attendees = sum(household.expected_attendee_count for household in households)
|
|
total_children = sum(household.child_count for household in households)
|
|
total_gift_amount = sum((household.total_gift_amount for household in households), start=Decimal("0.00"))
|
|
total_attending_households = sum(1 for household in households if household.attendance_status in {"attending", "partial"})
|
|
|
|
return {
|
|
"total_households": len(households),
|
|
"total_expected_attendees": total_expected_attendees,
|
|
"total_children": total_children,
|
|
"total_gift_amount": total_gift_amount,
|
|
"total_attending_households": total_attending_households,
|
|
}
|
|
|
|
|
|
def list_enabled_options(option_group: str) -> Sequence[OptionItem]:
|
|
result = db.session.execute(
|
|
db.select(OptionItem)
|
|
.where(
|
|
OptionItem.option_group == option_group,
|
|
OptionItem.is_enabled.is_(True),
|
|
)
|
|
.order_by(OptionItem.sort_order.asc(), OptionItem.id.asc()),
|
|
)
|
|
return result.scalars().all()
|
|
|
|
|
|
def parse_new_tag_labels(raw_value: str | None) -> tuple[list[str], list[str]]:
|
|
if not raw_value:
|
|
return [], []
|
|
|
|
normalized_labels: list[str] = []
|
|
errors: list[str] = []
|
|
for chunk in re.split(r"[\n,,]+", raw_value):
|
|
label = chunk.strip()
|
|
if not label:
|
|
continue
|
|
if len(label) > 32:
|
|
errors.append(f"标签“{label[:12]}...”过长,请控制在 32 个字符内。")
|
|
continue
|
|
if label not in normalized_labels:
|
|
normalized_labels.append(label)
|
|
|
|
return normalized_labels, errors
|
|
|
|
|
|
def parse_new_relation_category_label(raw_value: str | None) -> tuple[str | None, list[str]]:
|
|
if not raw_value:
|
|
return None, []
|
|
|
|
labels, errors = parse_new_tag_labels(raw_value)
|
|
if len(labels) > 1:
|
|
errors.append("一次只能新增一个关系分类。")
|
|
return None, errors
|
|
|
|
return (labels[0] if labels else None), errors
|
|
|
|
|
|
def ensure_tag_options(labels: Sequence[str], *, actor_id: int | None = None) -> list[OptionItem]:
|
|
if not labels:
|
|
return []
|
|
|
|
existing_options = list_enabled_options("tag")
|
|
existing_by_label = {option.option_label: option for option in existing_options}
|
|
sort_order_seed = max((option.sort_order for option in existing_options), default=0)
|
|
ensured: list[OptionItem] = []
|
|
|
|
for index, label in enumerate(labels, start=1):
|
|
existing = existing_by_label.get(label)
|
|
if existing is not None:
|
|
ensured.append(existing)
|
|
continue
|
|
|
|
option_code = slugify_option_label(label)
|
|
candidate_code = option_code
|
|
suffix = 2
|
|
while db.session.execute(
|
|
db.select(OptionItem.id).where(
|
|
OptionItem.option_group == "tag",
|
|
OptionItem.option_code == candidate_code,
|
|
).limit(1)
|
|
).scalar_one_or_none() is not None:
|
|
candidate_code = f"{option_code}-{suffix}"
|
|
suffix += 1
|
|
|
|
option = OptionItem(
|
|
option_group="tag",
|
|
option_code=candidate_code,
|
|
option_label=label,
|
|
sort_order=sort_order_seed + index,
|
|
is_enabled=True,
|
|
is_system=False,
|
|
created_by=actor_id,
|
|
updated_by=actor_id,
|
|
)
|
|
db.session.add(option)
|
|
db.session.flush()
|
|
existing_by_label[label] = option
|
|
ensured.append(option)
|
|
|
|
return ensured
|
|
|
|
|
|
def ensure_relation_category_option(label: str | None, *, actor_id: int | None = None) -> OptionItem | None:
|
|
if not label:
|
|
return None
|
|
|
|
existing_options = list_enabled_options("relation_category")
|
|
for option in existing_options:
|
|
if option.option_label == label:
|
|
return option
|
|
|
|
option_code = slugify_option_label(label)
|
|
candidate_code = option_code
|
|
suffix = 2
|
|
while db.session.execute(
|
|
db.select(OptionItem.id).where(
|
|
OptionItem.option_group == "relation_category",
|
|
OptionItem.option_code == candidate_code,
|
|
).limit(1)
|
|
).scalar_one_or_none() is not None:
|
|
candidate_code = f"{option_code}-{suffix}"
|
|
suffix += 1
|
|
|
|
sort_order_seed = max((option.sort_order for option in existing_options), default=0)
|
|
option = OptionItem(
|
|
option_group="relation_category",
|
|
option_code=candidate_code,
|
|
option_label=label,
|
|
sort_order=sort_order_seed + 10,
|
|
is_enabled=True,
|
|
is_system=False,
|
|
created_by=actor_id,
|
|
updated_by=actor_id,
|
|
)
|
|
db.session.add(option)
|
|
db.session.flush()
|
|
return option
|
|
|
|
|
|
def mask_phone(phone: str | None) -> str:
|
|
if not phone:
|
|
return "未登记"
|
|
|
|
digits_only = "".join(character for character in phone if character.isdigit())
|
|
if len(digits_only) >= 7:
|
|
return f"{digits_only[:3]}****{digits_only[-4:]}"
|
|
|
|
if len(phone) <= 4:
|
|
return phone
|
|
|
|
return f"{phone[:2]}***{phone[-2:]}"
|
|
|
|
|
|
def household_value_label(field_name: str, value: str | None) -> str:
|
|
if value is None:
|
|
return "-"
|
|
|
|
normalized_value = value.strip()
|
|
if not normalized_value:
|
|
return "-"
|
|
|
|
return HOUSEHOLD_VALUE_LABELS.get(field_name, {}).get(normalized_value, normalized_value)
|
|
|
|
|
|
def suggest_next_household_code() -> str:
|
|
result = db.session.execute(
|
|
db.select(Household.household_code),
|
|
)
|
|
max_numeric_suffix = 0
|
|
for household_code in result.scalars().all():
|
|
if not household_code:
|
|
continue
|
|
|
|
digits = "".join(character for character in household_code if character.isdigit())
|
|
if not digits:
|
|
continue
|
|
|
|
max_numeric_suffix = max(max_numeric_suffix, int(digits))
|
|
|
|
return f"H{max_numeric_suffix + 1:03d}"
|
|
|
|
|
|
def build_new_household_draft(*, household_code: str | None = None) -> Household:
|
|
household = Household()
|
|
household.household_code = household_code or suggest_next_household_code()
|
|
household.head_name = ""
|
|
household.phone = None
|
|
household.side = "groom_side"
|
|
household.invite_status = "not_invited"
|
|
household.attendance_status = "pending"
|
|
household.expected_attendee_count = 0
|
|
household.actual_attendee_count = 0
|
|
household.child_count = 0
|
|
household.red_packet_child_count = 0
|
|
household.total_gift_amount = Decimal("0.00")
|
|
household.favor_status = "not_given"
|
|
household.candy_status = "not_given"
|
|
household.child_red_packet_status = "not_given"
|
|
household.tag_option_ids_json = []
|
|
household.note = None
|
|
household.version = 1
|
|
return household
|
|
|
|
|
|
def is_household_code_taken(household_code: str, *, excluding_household_id: int | None = None) -> bool:
|
|
query = db.select(Household.id).where(
|
|
Household.household_code == household_code,
|
|
)
|
|
if excluding_household_id is not None:
|
|
query = query.where(Household.id != excluding_household_id)
|
|
|
|
return db.session.execute(query.limit(1)).scalar_one_or_none() is not None
|
|
|
|
|
|
def is_head_name_taken(head_name: str, *, excluding_household_id: int | None = None) -> bool:
|
|
query = db.select(Household.id).where(
|
|
Household.deleted_at.is_(None),
|
|
Household.head_name == head_name,
|
|
)
|
|
if excluding_household_id is not None:
|
|
query = query.where(Household.id != excluding_household_id)
|
|
|
|
return db.session.execute(query.limit(1)).scalar_one_or_none() is not None
|
|
|
|
|
|
def serialize_admin_edit_snapshot(household: Household) -> dict[str, object]:
|
|
return {
|
|
"household_code": household.household_code,
|
|
"head_name": household.head_name,
|
|
"phone": household.phone,
|
|
"side": household.side,
|
|
"relation_category_option_id": household.relation_category_option_id,
|
|
"relation_detail_option_id": household.relation_detail_option_id,
|
|
"tag_option_ids_json": sorted(household.tag_option_ids_json or []),
|
|
"invite_status": household.invite_status,
|
|
"attendance_status": household.attendance_status,
|
|
"expected_attendee_count": household.expected_attendee_count,
|
|
"actual_attendee_count": household.actual_attendee_count,
|
|
"child_count": household.child_count,
|
|
"red_packet_child_count": household.red_packet_child_count,
|
|
"total_gift_amount": str(household.total_gift_amount),
|
|
"gift_method_option_id": household.gift_method_option_id,
|
|
"gift_scene_option_id": household.gift_scene_option_id,
|
|
"favor_status": household.favor_status,
|
|
"candy_status": household.candy_status,
|
|
"child_red_packet_status": household.child_red_packet_status,
|
|
"note": household.note,
|
|
"deleted_at": household.deleted_at.isoformat(sep=" ") if household.deleted_at is not None else None,
|
|
"version": household.version,
|
|
}
|
|
|
|
|
|
def serialize_entry_edit_snapshot(household: Household) -> dict[str, object]:
|
|
return {
|
|
"head_name": household.head_name,
|
|
"phone": household.phone,
|
|
"attendance_status": household.attendance_status,
|
|
"actual_attendee_count": household.actual_attendee_count,
|
|
"child_count": household.child_count,
|
|
"red_packet_child_count": household.red_packet_child_count,
|
|
"total_gift_amount": str(household.total_gift_amount),
|
|
"gift_method_option_id": household.gift_method_option_id,
|
|
"gift_scene_option_id": household.gift_scene_option_id,
|
|
"favor_status": household.favor_status,
|
|
"candy_status": household.candy_status,
|
|
"child_red_packet_status": household.child_red_packet_status,
|
|
"note": household.note,
|
|
"version": household.version,
|
|
}
|
|
|
|
|
|
def parse_entry_form(form: dict[str, str], *, valid_method_ids: set[int], valid_scene_ids: set[int]) -> tuple[dict[str, object], list[str]]:
|
|
errors: list[str] = []
|
|
|
|
phone = form.get("phone", "").strip()
|
|
if phone and not PHONE_PATTERN.fullmatch(phone):
|
|
errors.append("联系电话格式不正确,请仅输入数字、空格或常见连接符。")
|
|
|
|
attendance_status = form.get("attendance_status", "").strip()
|
|
if attendance_status not in ATTENDANCE_STATUS_OPTIONS:
|
|
errors.append("到场状态不合法。")
|
|
|
|
favor_status = form.get("favor_status", "").strip()
|
|
if favor_status not in GIVEN_STATUS_OPTIONS:
|
|
errors.append("伴手礼状态不合法。")
|
|
|
|
candy_status = form.get("candy_status", "").strip()
|
|
if candy_status not in GIVEN_STATUS_OPTIONS:
|
|
errors.append("喜糖状态不合法。")
|
|
|
|
child_red_packet_status = form.get("child_red_packet_status", "").strip()
|
|
if child_red_packet_status not in CHILD_RED_PACKET_STATUS_OPTIONS:
|
|
errors.append("儿童红包状态不合法。")
|
|
|
|
actual_attendee_count = _parse_non_negative_int(form.get("actual_attendee_count", "0"), "实际到场人数", errors)
|
|
child_count = _parse_non_negative_int(form.get("child_count", "0"), "儿童人数", errors)
|
|
red_packet_child_count = _parse_non_negative_int(form.get("red_packet_child_count", "0"), "需红包儿童人数", errors)
|
|
total_gift_amount = _parse_non_negative_decimal(form.get("total_gift_amount", "0"), "礼金金额", errors)
|
|
|
|
if red_packet_child_count is not None and child_count is not None and red_packet_child_count > child_count:
|
|
errors.append("需红包儿童人数不能大于儿童人数。")
|
|
|
|
gift_method_option_id = _parse_optional_option_id(
|
|
form.get("gift_method_option_id", ""),
|
|
valid_ids=valid_method_ids,
|
|
field_label="礼金方式",
|
|
errors=errors,
|
|
)
|
|
gift_scene_option_id = _parse_optional_option_id(
|
|
form.get("gift_scene_option_id", ""),
|
|
valid_ids=valid_scene_ids,
|
|
field_label="礼金记录场景",
|
|
errors=errors,
|
|
)
|
|
|
|
if errors:
|
|
return {}, errors
|
|
|
|
return {
|
|
"phone": phone or None,
|
|
"attendance_status": attendance_status,
|
|
"actual_attendee_count": actual_attendee_count,
|
|
"child_count": child_count,
|
|
"red_packet_child_count": red_packet_child_count,
|
|
"total_gift_amount": total_gift_amount,
|
|
"gift_method_option_id": gift_method_option_id,
|
|
"gift_scene_option_id": gift_scene_option_id,
|
|
"favor_status": favor_status,
|
|
"candy_status": candy_status,
|
|
"child_red_packet_status": child_red_packet_status,
|
|
"note": form.get("note", "").strip() or None,
|
|
}, []
|
|
|
|
|
|
def parse_admin_form(
|
|
form: dict[str, str],
|
|
*,
|
|
raw_tag_option_ids: Sequence[str],
|
|
valid_relation_category_ids: set[int],
|
|
relation_detail_parent_map: dict[int, int | None],
|
|
valid_tag_ids: set[int],
|
|
valid_method_ids: set[int],
|
|
valid_scene_ids: set[int],
|
|
require_household_code: bool = True,
|
|
) -> tuple[dict[str, object], list[str]]:
|
|
errors: list[str] = []
|
|
|
|
household_code = form.get("household_code", "").strip()
|
|
if require_household_code and not household_code:
|
|
errors.append("户编码不能为空。")
|
|
|
|
head_name = form.get("head_name", "").strip()
|
|
if not head_name:
|
|
errors.append("户主姓名不能为空。")
|
|
|
|
phone = form.get("phone", "").strip()
|
|
if phone and not PHONE_PATTERN.fullmatch(phone):
|
|
errors.append("联系电话格式不正确,请仅输入数字、空格或常见连接符。")
|
|
|
|
side = form.get("side", "").strip()
|
|
if side not in SIDE_OPTIONS:
|
|
errors.append("所属侧不合法。")
|
|
|
|
invite_status = form.get("invite_status", "").strip()
|
|
if invite_status not in INVITE_STATUS_OPTIONS:
|
|
errors.append("邀请状态不合法。")
|
|
|
|
attendance_status = form.get("attendance_status", "").strip()
|
|
if attendance_status not in ATTENDANCE_STATUS_OPTIONS:
|
|
errors.append("到场状态不合法。")
|
|
|
|
favor_status = form.get("favor_status", "").strip()
|
|
if favor_status not in GIVEN_STATUS_OPTIONS:
|
|
errors.append("伴手礼状态不合法。")
|
|
|
|
candy_status = form.get("candy_status", "").strip()
|
|
if candy_status not in GIVEN_STATUS_OPTIONS:
|
|
errors.append("喜糖状态不合法。")
|
|
|
|
child_red_packet_status = form.get("child_red_packet_status", "").strip()
|
|
if child_red_packet_status not in CHILD_RED_PACKET_STATUS_OPTIONS:
|
|
errors.append("儿童红包状态不合法。")
|
|
|
|
expected_attendee_count = _parse_non_negative_int(form.get("expected_attendee_count", "0"), "预计人数", errors)
|
|
actual_attendee_count = _parse_non_negative_int(form.get("actual_attendee_count", "0"), "实际到场人数", errors)
|
|
child_count = _parse_non_negative_int(form.get("child_count", "0"), "儿童人数", errors)
|
|
red_packet_child_count = _parse_non_negative_int(form.get("red_packet_child_count", "0"), "需红包儿童人数", errors)
|
|
total_gift_amount = _parse_non_negative_decimal(form.get("total_gift_amount", "0"), "礼金金额", errors)
|
|
|
|
if red_packet_child_count is not None and child_count is not None and red_packet_child_count > child_count:
|
|
errors.append("需红包儿童人数不能大于儿童人数。")
|
|
|
|
relation_category_option_id = _parse_optional_option_id(
|
|
form.get("relation_category_option_id", ""),
|
|
valid_ids=valid_relation_category_ids,
|
|
field_label="关系分类",
|
|
errors=errors,
|
|
)
|
|
relation_detail_option_id: int | None = None
|
|
if relation_detail_parent_map:
|
|
relation_detail_option_id = _parse_optional_option_id(
|
|
form.get("relation_detail_option_id", ""),
|
|
valid_ids=set(relation_detail_parent_map),
|
|
field_label="具体关系",
|
|
errors=errors,
|
|
)
|
|
if relation_detail_option_id is not None:
|
|
expected_parent_id = relation_detail_parent_map.get(relation_detail_option_id)
|
|
if relation_category_option_id is None or expected_parent_id != relation_category_option_id:
|
|
errors.append("具体关系与关系分类不匹配。")
|
|
|
|
tag_option_ids = _parse_tag_option_ids(raw_tag_option_ids, valid_ids=valid_tag_ids, errors=errors)
|
|
|
|
gift_method_option_id = _parse_optional_option_id(
|
|
form.get("gift_method_option_id", ""),
|
|
valid_ids=valid_method_ids,
|
|
field_label="礼金方式",
|
|
errors=errors,
|
|
)
|
|
gift_scene_option_id = _parse_optional_option_id(
|
|
form.get("gift_scene_option_id", ""),
|
|
valid_ids=valid_scene_ids,
|
|
field_label="礼金记录场景",
|
|
errors=errors,
|
|
)
|
|
|
|
if errors:
|
|
return {}, errors
|
|
|
|
return {
|
|
"household_code": household_code or None,
|
|
"head_name": head_name,
|
|
"phone": phone or None,
|
|
"side": side,
|
|
"relation_category_option_id": relation_category_option_id,
|
|
"relation_detail_option_id": relation_detail_option_id,
|
|
"tag_option_ids_json": tag_option_ids,
|
|
"invite_status": invite_status,
|
|
"attendance_status": attendance_status,
|
|
"expected_attendee_count": expected_attendee_count,
|
|
"actual_attendee_count": actual_attendee_count,
|
|
"child_count": child_count,
|
|
"red_packet_child_count": red_packet_child_count,
|
|
"total_gift_amount": total_gift_amount,
|
|
"gift_method_option_id": gift_method_option_id,
|
|
"gift_scene_option_id": gift_scene_option_id,
|
|
"favor_status": favor_status,
|
|
"candy_status": candy_status,
|
|
"child_red_packet_status": child_red_packet_status,
|
|
"note": form.get("note", "").strip() or None,
|
|
}, []
|
|
|
|
|
|
def _parse_non_negative_int(raw_value: str, field_label: str, errors: list[str]) -> int | None:
|
|
value = raw_value.strip()
|
|
try:
|
|
parsed = int(value)
|
|
except ValueError:
|
|
errors.append(f"{field_label}必须是非负整数。")
|
|
return None
|
|
|
|
if parsed < 0:
|
|
errors.append(f"{field_label}必须是非负整数。")
|
|
return None
|
|
|
|
return parsed
|
|
|
|
|
|
def _parse_non_negative_decimal(raw_value: str, field_label: str, errors: list[str]) -> Decimal | None:
|
|
value = raw_value.strip()
|
|
try:
|
|
parsed = Decimal(value)
|
|
except InvalidOperation:
|
|
errors.append(f"{field_label}必须是合法金额。")
|
|
return None
|
|
|
|
if parsed < 0:
|
|
errors.append(f"{field_label}不能为负数。")
|
|
return None
|
|
|
|
return parsed.quantize(Decimal("0.01"))
|
|
|
|
|
|
def _parse_optional_option_id(raw_value: str, *, valid_ids: set[int], field_label: str, errors: list[str]) -> int | None:
|
|
value = raw_value.strip()
|
|
if not value:
|
|
return None
|
|
|
|
try:
|
|
parsed = int(value)
|
|
except ValueError:
|
|
errors.append(f"{field_label}选项不合法。")
|
|
return None
|
|
|
|
if parsed not in valid_ids:
|
|
errors.append(f"{field_label}选项不合法。")
|
|
return None
|
|
|
|
return parsed
|
|
|
|
|
|
def _parse_tag_option_ids(raw_values: Sequence[str], *, valid_ids: set[int], errors: list[str]) -> list[int]:
|
|
parsed_values: list[int] = []
|
|
|
|
for raw_value in raw_values:
|
|
value = raw_value.strip()
|
|
if not value:
|
|
continue
|
|
|
|
try:
|
|
parsed = int(value)
|
|
except ValueError:
|
|
errors.append("标签选项不合法。")
|
|
continue
|
|
|
|
if parsed not in valid_ids:
|
|
errors.append("标签选项不合法。")
|
|
continue
|
|
|
|
if parsed not in parsed_values:
|
|
parsed_values.append(parsed)
|
|
|
|
return sorted(parsed_values)
|
|
|
|
|
|
def normalize_search_term(search_term: str | None) -> str:
|
|
if not search_term:
|
|
return ""
|
|
|
|
return "".join(search_term.strip().lower().split())
|
|
|
|
|
|
def _normalize_text(value: str | None) -> str:
|
|
if not value:
|
|
return ""
|
|
|
|
return "".join(value.strip().lower().split())
|
|
|
|
|
|
def _name_search_tokens(name: str | None) -> tuple[str, str]:
|
|
normalized_name = _normalize_text(name)
|
|
if not normalized_name:
|
|
return "", ""
|
|
|
|
if Style is None:
|
|
fallback_tokens = lazy_pinyin(normalized_name)
|
|
fallback_value = "".join(fallback_tokens).lower()
|
|
return fallback_value, fallback_value
|
|
|
|
full_pinyin = "".join(lazy_pinyin(normalized_name, style=Style.NORMAL)).lower()
|
|
initials = "".join(lazy_pinyin(normalized_name, style=Style.FIRST_LETTER)).lower()
|
|
return full_pinyin, initials
|
|
|
|
|
|
def _household_matches_search(household: Household, normalized_term: str) -> bool:
|
|
direct_values = [
|
|
household.head_name,
|
|
household.note,
|
|
]
|
|
if any(normalized_term in _normalize_text(value) for value in direct_values):
|
|
return True
|
|
|
|
tag_label_map = {
|
|
option.id: option.option_label
|
|
for option in list_enabled_options("tag")
|
|
}
|
|
if any(
|
|
normalized_term in _normalize_text(tag_label_map.get(tag_id))
|
|
for tag_id in (household.tag_option_ids_json or [])
|
|
):
|
|
return True
|
|
|
|
full_pinyin, initials = _name_search_tokens(household.head_name)
|
|
if normalized_term in full_pinyin or normalized_term in initials:
|
|
return True
|
|
|
|
for member in household.members:
|
|
if normalized_term in _normalize_text(member.name):
|
|
return True
|
|
|
|
member_full_pinyin, member_initials = _name_search_tokens(member.name)
|
|
if normalized_term in member_full_pinyin or normalized_term in member_initials:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def _normalize_sort(sort_by: str | None, sort_order: str | None) -> tuple[str, str]:
|
|
normalized_sort_by = (sort_by or "updated_at").strip()
|
|
if normalized_sort_by not in SORT_FIELD_OPTIONS:
|
|
normalized_sort_by = "updated_at"
|
|
|
|
normalized_sort_order = (sort_order or "desc").strip()
|
|
if normalized_sort_order not in SORT_ORDER_OPTIONS:
|
|
normalized_sort_order = "desc"
|
|
|
|
return normalized_sort_by, normalized_sort_order
|
|
|
|
|
|
def _database_sort_covers(sort_by: str | None) -> bool:
|
|
normalized_sort_by, _ = _normalize_sort(sort_by, None)
|
|
return normalized_sort_by != "head_name"
|
|
|
|
|
|
def _apply_database_sort(
|
|
query: Select[tuple[Household]],
|
|
*,
|
|
sort_by: str | None,
|
|
sort_order: str | None,
|
|
) -> Select[tuple[Household]]:
|
|
normalized_sort_by, normalized_sort_order = _normalize_sort(sort_by, sort_order)
|
|
if normalized_sort_by == "head_name":
|
|
return query
|
|
|
|
if normalized_sort_by == "expected_attendee_count":
|
|
sort_column = Household.expected_attendee_count
|
|
elif normalized_sort_by == "actual_attendee_count":
|
|
sort_column = Household.actual_attendee_count
|
|
elif normalized_sort_by == "child_count":
|
|
sort_column = Household.child_count
|
|
elif normalized_sort_by == "total_gift_amount":
|
|
sort_column = Household.total_gift_amount
|
|
else:
|
|
sort_column = Household.updated_at
|
|
|
|
ordered_column = sort_column.asc() if normalized_sort_order == "asc" else sort_column.desc()
|
|
ordered_id = Household.id.asc() if normalized_sort_order == "asc" else Household.id.desc()
|
|
return query.order_by(ordered_column, ordered_id)
|
|
|
|
|
|
def _sort_households(households: Sequence[Household], *, sort_by: str | None, sort_order: str | None) -> list[Household]:
|
|
normalized_sort_by, normalized_sort_order = _normalize_sort(sort_by, sort_order)
|
|
reverse = normalized_sort_order == "desc"
|
|
|
|
def sort_key(household: Household) -> tuple[object, int]:
|
|
if normalized_sort_by == "head_name":
|
|
name_sort_key, _ = _name_search_tokens(household.head_name)
|
|
primary_value: object = name_sort_key or _normalize_text(household.head_name)
|
|
elif normalized_sort_by == "expected_attendee_count":
|
|
primary_value = household.expected_attendee_count
|
|
elif normalized_sort_by == "actual_attendee_count":
|
|
primary_value = household.actual_attendee_count
|
|
elif normalized_sort_by == "child_count":
|
|
primary_value = household.child_count
|
|
elif normalized_sort_by == "total_gift_amount":
|
|
primary_value = household.total_gift_amount
|
|
else:
|
|
primary_value = household.updated_at or datetime.min
|
|
|
|
return primary_value, household.id
|
|
|
|
return sorted(households, key=sort_key, reverse=reverse)
|
|
|
|
|
|
def slugify_option_label(label: str) -> str:
|
|
normalized = _normalize_text(label)
|
|
ascii_tokens = "".join(lazy_pinyin(normalized)).lower()
|
|
slug = re.sub(r"[^a-z0-9]+", "-", ascii_tokens).strip("-")
|
|
if slug:
|
|
return slug[:48]
|
|
fallback = re.sub(r"[^a-z0-9]+", "-", normalized).strip("-")
|
|
return (fallback or "tag")[:48]
|
|
|