You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

954 lines
33 KiB

from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal, InvalidOperation
import re
from sqlalchemy import Select, func
from sqlalchemy.orm import selectinload
try:
from pypinyin import Style, lazy_pinyin
except ImportError: # pragma: no cover - dependency is installed in runtime verification
Style = None
def _lazy_pinyin_fallback(value: str, style: object | None = None) -> list[str]:
_ = style
return [value]
lazy_pinyin = _lazy_pinyin_fallback
from app.extensions import db
from app.models import Household, OptionItem
ATTENDANCE_STATUS_OPTIONS = ("pending", "attending", "absent", "partial")
INVITE_STATUS_OPTIONS = ("not_invited", "invited", "confirmed", "declined")
GIVEN_STATUS_OPTIONS = ("not_given", "given")
CHILD_RED_PACKET_STATUS_OPTIONS = ("not_given", "partial", "given")
SIDE_OPTIONS = ("groom_side", "bride_side", "both_side")
SORT_FIELD_OPTIONS = (
"updated_at",
"head_name",
"expected_attendee_count",
"actual_attendee_count",
"child_count",
"total_gift_amount",
)
SORT_ORDER_OPTIONS = ("asc", "desc")
HOMEPAGE_PER_PAGE_OPTIONS = (10, 20, 50)
DEFAULT_HOMEPAGE_PER_PAGE = 10
SIDE_LABELS = {
"groom_side": "男方",
"bride_side": "女方",
"both_side": "双方共有",
}
INVITE_STATUS_LABELS = {
"not_invited": "未邀请",
"invited": "已邀请",
"confirmed": "已确认",
"declined": "已婉拒",
}
ATTENDANCE_STATUS_LABELS = {
"pending": "待确认",
"attending": "到场",
"absent": "缺席",
"partial": "部分到场",
}
GIVEN_STATUS_LABELS = {
"not_given": "未发",
"given": "已发",
}
CHILD_RED_PACKET_STATUS_LABELS = {
"not_given": "未发",
"partial": "部分发放",
"given": "已发",
}
HOUSEHOLD_VALUE_LABELS = {
"side": SIDE_LABELS,
"invite_status": INVITE_STATUS_LABELS,
"attendance_status": ATTENDANCE_STATUS_LABELS,
"favor_status": GIVEN_STATUS_LABELS,
"candy_status": GIVEN_STATUS_LABELS,
"child_red_packet_status": CHILD_RED_PACKET_STATUS_LABELS,
}
SORT_FIELD_LABELS = {
"updated_at": "最近更新时间",
"head_name": "户主姓名",
"expected_attendee_count": "预计人数",
"actual_attendee_count": "实际到场人数",
"child_count": "儿童人数",
"total_gift_amount": "礼金金额",
}
ADMIN_EDITABLE_FIELDS = (
"household_code",
"head_name",
"phone",
"side",
"relation_category_option_id",
"relation_detail_option_id",
"tag_option_ids_json",
"invite_status",
"attendance_status",
"expected_attendee_count",
"actual_attendee_count",
"child_count",
"red_packet_child_count",
"total_gift_amount",
"gift_method_option_id",
"gift_scene_option_id",
"favor_status",
"candy_status",
"child_red_packet_status",
"note",
)
ENTRY_EDITABLE_FIELDS = (
"head_name",
"phone",
"attendance_status",
"actual_attendee_count",
"child_count",
"red_packet_child_count",
"total_gift_amount",
"gift_method_option_id",
"gift_scene_option_id",
"favor_status",
"candy_status",
"child_red_packet_status",
"note",
)
PHONE_PATTERN = re.compile(r"^[0-9+\-\s]{6,32}$")
@dataclass(frozen=True)
class HouseholdHomepagePage:
items: Sequence[Household]
filtered_households: Sequence[Household]
filtered_count: int
page: int
per_page: int
total_pages: int
has_prev: bool
has_next: bool
page_start: int
page_end: int
def household_query(
_search_term: str | None = None,
*,
side: str | None = None,
invite_status: str | None = None,
attendance_status: str | None = None,
tag_option_id: int | None = None,
sort_by: str | None = None,
sort_order: str | None = None,
) -> Select[tuple[Household]]:
query = (
db.select(Household)
.where(Household.deleted_at.is_(None))
.options(
selectinload(Household.relation_category_option),
selectinload(Household.relation_detail_option),
selectinload(Household.gift_method_option),
selectinload(Household.gift_scene_option),
selectinload(Household.members),
selectinload(Household.updated_by_account),
)
)
normalized_side = (side or "").strip()
if normalized_side in SIDE_OPTIONS:
query = query.where(Household.side == normalized_side)
normalized_invite_status = (invite_status or "").strip()
if normalized_invite_status in INVITE_STATUS_OPTIONS:
query = query.where(Household.invite_status == normalized_invite_status)
normalized_attendance_status = (attendance_status or "").strip()
if normalized_attendance_status in ATTENDANCE_STATUS_OPTIONS:
query = query.where(Household.attendance_status == normalized_attendance_status)
if tag_option_id is not None:
query = query.where(Household.tag_option_ids_json.is_not(None))
query = _apply_database_sort(query, sort_by=sort_by, sort_order=sort_order)
return query
def list_households(
*,
search_term: str | None = None,
side: str | None = None,
invite_status: str | None = None,
attendance_status: str | None = None,
tag_option_id: int | None = None,
sort_by: str | None = None,
sort_order: str | None = None,
limit: int | None = 50,
) -> Sequence[Household]:
result = db.session.execute(
household_query(
search_term,
side=side,
invite_status=invite_status,
attendance_status=attendance_status,
tag_option_id=tag_option_id,
sort_by=sort_by,
sort_order=sort_order,
),
)
households = result.scalars().all()
normalized_term = normalize_search_term(search_term)
if normalized_term:
households = [household for household in households if _household_matches_search(household, normalized_term)]
if tag_option_id is not None:
households = [household for household in households if tag_option_id in (household.tag_option_ids_json or [])]
if not _database_sort_covers(sort_by):
households = _sort_households(households, sort_by=sort_by, sort_order=sort_order)
if limit is None:
return households
return households[:limit]
def paginate_households_for_homepage(
*,
search_term: str | None = None,
side: str | None = None,
invite_status: str | None = None,
attendance_status: str | None = None,
tag_option_id: int | None = None,
sort_by: str | None = None,
sort_order: str | None = None,
page: int = 1,
per_page: int = DEFAULT_HOMEPAGE_PER_PAGE,
) -> HouseholdHomepagePage:
normalized_per_page = per_page if per_page in HOMEPAGE_PER_PAGE_OPTIONS else DEFAULT_HOMEPAGE_PER_PAGE
filtered_households = list_households(
search_term=search_term,
side=side,
invite_status=invite_status,
attendance_status=attendance_status,
tag_option_id=tag_option_id,
sort_by=sort_by,
sort_order=sort_order,
limit=None,
)
filtered_count = len(filtered_households)
total_pages = max(1, (filtered_count + normalized_per_page - 1) // normalized_per_page)
normalized_page = min(max(page, 1), total_pages)
start_index = (normalized_page - 1) * normalized_per_page
end_index = start_index + normalized_per_page
page_items = filtered_households[start_index:end_index]
page_start = start_index + 1 if filtered_count else 0
page_end = min(end_index, filtered_count) if filtered_count else 0
return HouseholdHomepagePage(
items=page_items,
filtered_households=filtered_households,
filtered_count=filtered_count,
page=normalized_page,
per_page=normalized_per_page,
total_pages=total_pages,
has_prev=normalized_page > 1,
has_next=normalized_page < total_pages,
page_start=page_start,
page_end=page_end,
)
def get_household_or_none(household_id: int) -> Household | None:
result = db.session.execute(
household_query().where(Household.id == household_id),
)
return result.scalar_one_or_none()
def get_homepage_stats() -> dict[str, int | Decimal]:
summary = db.session.execute(
db.select(
func.count(Household.id),
func.coalesce(func.sum(Household.expected_attendee_count), 0),
func.coalesce(func.sum(Household.child_count), 0),
func.coalesce(func.sum(Household.total_gift_amount), Decimal("0.00")),
).where(Household.deleted_at.is_(None)),
).one()
return {
"total_households": int(summary[0] or 0),
"total_expected_attendees": int(summary[1] or 0),
"total_children": int(summary[2] or 0),
"total_gift_amount": summary[3] or Decimal("0.00"),
}
def get_filtered_homepage_stats(households: Sequence[Household]) -> dict[str, int | Decimal]:
total_expected_attendees = sum(household.expected_attendee_count for household in households)
total_children = sum(household.child_count for household in households)
total_gift_amount = sum((household.total_gift_amount for household in households), start=Decimal("0.00"))
total_attending_households = sum(1 for household in households if household.attendance_status in {"attending", "partial"})
return {
"total_households": len(households),
"total_expected_attendees": total_expected_attendees,
"total_children": total_children,
"total_gift_amount": total_gift_amount,
"total_attending_households": total_attending_households,
}
def list_enabled_options(option_group: str) -> Sequence[OptionItem]:
result = db.session.execute(
db.select(OptionItem)
.where(
OptionItem.option_group == option_group,
OptionItem.is_enabled.is_(True),
)
.order_by(OptionItem.sort_order.asc(), OptionItem.id.asc()),
)
return result.scalars().all()
def parse_new_tag_labels(raw_value: str | None) -> tuple[list[str], list[str]]:
if not raw_value:
return [], []
normalized_labels: list[str] = []
errors: list[str] = []
for chunk in re.split(r"[\n,,]+", raw_value):
label = chunk.strip()
if not label:
continue
if len(label) > 32:
errors.append(f"标签“{label[:12]}...”过长,请控制在 32 个字符内。")
continue
if label not in normalized_labels:
normalized_labels.append(label)
return normalized_labels, errors
def parse_new_relation_category_label(raw_value: str | None) -> tuple[str | None, list[str]]:
if not raw_value:
return None, []
labels, errors = parse_new_tag_labels(raw_value)
if len(labels) > 1:
errors.append("一次只能新增一个关系分类。")
return None, errors
return (labels[0] if labels else None), errors
def ensure_tag_options(labels: Sequence[str], *, actor_id: int | None = None) -> list[OptionItem]:
if not labels:
return []
existing_options = list_enabled_options("tag")
existing_by_label = {option.option_label: option for option in existing_options}
sort_order_seed = max((option.sort_order for option in existing_options), default=0)
ensured: list[OptionItem] = []
for index, label in enumerate(labels, start=1):
existing = existing_by_label.get(label)
if existing is not None:
ensured.append(existing)
continue
option_code = slugify_option_label(label)
candidate_code = option_code
suffix = 2
while db.session.execute(
db.select(OptionItem.id).where(
OptionItem.option_group == "tag",
OptionItem.option_code == candidate_code,
).limit(1)
).scalar_one_or_none() is not None:
candidate_code = f"{option_code}-{suffix}"
suffix += 1
option = OptionItem(
option_group="tag",
option_code=candidate_code,
option_label=label,
sort_order=sort_order_seed + index,
is_enabled=True,
is_system=False,
created_by=actor_id,
updated_by=actor_id,
)
db.session.add(option)
db.session.flush()
existing_by_label[label] = option
ensured.append(option)
return ensured
def ensure_relation_category_option(label: str | None, *, actor_id: int | None = None) -> OptionItem | None:
if not label:
return None
existing_options = list_enabled_options("relation_category")
for option in existing_options:
if option.option_label == label:
return option
option_code = slugify_option_label(label)
candidate_code = option_code
suffix = 2
while db.session.execute(
db.select(OptionItem.id).where(
OptionItem.option_group == "relation_category",
OptionItem.option_code == candidate_code,
).limit(1)
).scalar_one_or_none() is not None:
candidate_code = f"{option_code}-{suffix}"
suffix += 1
sort_order_seed = max((option.sort_order for option in existing_options), default=0)
option = OptionItem(
option_group="relation_category",
option_code=candidate_code,
option_label=label,
sort_order=sort_order_seed + 10,
is_enabled=True,
is_system=False,
created_by=actor_id,
updated_by=actor_id,
)
db.session.add(option)
db.session.flush()
return option
def mask_phone(phone: str | None) -> str:
if not phone:
return "未登记"
digits_only = "".join(character for character in phone if character.isdigit())
if len(digits_only) >= 7:
return f"{digits_only[:3]}****{digits_only[-4:]}"
if len(phone) <= 4:
return phone
return f"{phone[:2]}***{phone[-2:]}"
def household_value_label(field_name: str, value: str | None) -> str:
if value is None:
return "-"
normalized_value = value.strip()
if not normalized_value:
return "-"
return HOUSEHOLD_VALUE_LABELS.get(field_name, {}).get(normalized_value, normalized_value)
def suggest_next_household_code() -> str:
result = db.session.execute(
db.select(Household.household_code),
)
max_numeric_suffix = 0
for household_code in result.scalars().all():
if not household_code:
continue
digits = "".join(character for character in household_code if character.isdigit())
if not digits:
continue
max_numeric_suffix = max(max_numeric_suffix, int(digits))
return f"H{max_numeric_suffix + 1:03d}"
def build_new_household_draft(*, household_code: str | None = None) -> Household:
household = Household()
household.household_code = household_code or suggest_next_household_code()
household.head_name = ""
household.phone = None
household.side = "groom_side"
household.invite_status = "not_invited"
household.attendance_status = "pending"
household.expected_attendee_count = 0
household.actual_attendee_count = 0
household.child_count = 0
household.red_packet_child_count = 0
household.total_gift_amount = Decimal("0.00")
household.favor_status = "not_given"
household.candy_status = "not_given"
household.child_red_packet_status = "not_given"
household.tag_option_ids_json = []
household.note = None
household.version = 1
return household
def is_household_code_taken(household_code: str, *, excluding_household_id: int | None = None) -> bool:
query = db.select(Household.id).where(
Household.household_code == household_code,
)
if excluding_household_id is not None:
query = query.where(Household.id != excluding_household_id)
return db.session.execute(query.limit(1)).scalar_one_or_none() is not None
def is_head_name_taken(head_name: str, *, excluding_household_id: int | None = None) -> bool:
query = db.select(Household.id).where(
Household.deleted_at.is_(None),
Household.head_name == head_name,
)
if excluding_household_id is not None:
query = query.where(Household.id != excluding_household_id)
return db.session.execute(query.limit(1)).scalar_one_or_none() is not None
def serialize_admin_edit_snapshot(household: Household) -> dict[str, object]:
return {
"household_code": household.household_code,
"head_name": household.head_name,
"phone": household.phone,
"side": household.side,
"relation_category_option_id": household.relation_category_option_id,
"relation_detail_option_id": household.relation_detail_option_id,
"tag_option_ids_json": sorted(household.tag_option_ids_json or []),
"invite_status": household.invite_status,
"attendance_status": household.attendance_status,
"expected_attendee_count": household.expected_attendee_count,
"actual_attendee_count": household.actual_attendee_count,
"child_count": household.child_count,
"red_packet_child_count": household.red_packet_child_count,
"total_gift_amount": str(household.total_gift_amount),
"gift_method_option_id": household.gift_method_option_id,
"gift_scene_option_id": household.gift_scene_option_id,
"favor_status": household.favor_status,
"candy_status": household.candy_status,
"child_red_packet_status": household.child_red_packet_status,
"note": household.note,
"deleted_at": household.deleted_at.isoformat(sep=" ") if household.deleted_at is not None else None,
"version": household.version,
}
def serialize_entry_edit_snapshot(household: Household) -> dict[str, object]:
return {
"head_name": household.head_name,
"phone": household.phone,
"attendance_status": household.attendance_status,
"actual_attendee_count": household.actual_attendee_count,
"child_count": household.child_count,
"red_packet_child_count": household.red_packet_child_count,
"total_gift_amount": str(household.total_gift_amount),
"gift_method_option_id": household.gift_method_option_id,
"gift_scene_option_id": household.gift_scene_option_id,
"favor_status": household.favor_status,
"candy_status": household.candy_status,
"child_red_packet_status": household.child_red_packet_status,
"note": household.note,
"version": household.version,
}
def parse_entry_form(form: dict[str, str], *, valid_method_ids: set[int], valid_scene_ids: set[int]) -> tuple[dict[str, object], list[str]]:
errors: list[str] = []
phone = form.get("phone", "").strip()
if phone and not PHONE_PATTERN.fullmatch(phone):
errors.append("联系电话格式不正确,请仅输入数字、空格或常见连接符。")
attendance_status = form.get("attendance_status", "").strip()
if attendance_status not in ATTENDANCE_STATUS_OPTIONS:
errors.append("到场状态不合法。")
favor_status = form.get("favor_status", "").strip()
if favor_status not in GIVEN_STATUS_OPTIONS:
errors.append("伴手礼状态不合法。")
candy_status = form.get("candy_status", "").strip()
if candy_status not in GIVEN_STATUS_OPTIONS:
errors.append("喜糖状态不合法。")
child_red_packet_status = form.get("child_red_packet_status", "").strip()
if child_red_packet_status not in CHILD_RED_PACKET_STATUS_OPTIONS:
errors.append("儿童红包状态不合法。")
actual_attendee_count = _parse_non_negative_int(form.get("actual_attendee_count", "0"), "实际到场人数", errors)
child_count = _parse_non_negative_int(form.get("child_count", "0"), "儿童人数", errors)
red_packet_child_count = _parse_non_negative_int(form.get("red_packet_child_count", "0"), "需红包儿童人数", errors)
total_gift_amount = _parse_non_negative_decimal(form.get("total_gift_amount", "0"), "礼金金额", errors)
if red_packet_child_count is not None and child_count is not None and red_packet_child_count > child_count:
errors.append("需红包儿童人数不能大于儿童人数。")
gift_method_option_id = _parse_optional_option_id(
form.get("gift_method_option_id", ""),
valid_ids=valid_method_ids,
field_label="礼金方式",
errors=errors,
)
gift_scene_option_id = _parse_optional_option_id(
form.get("gift_scene_option_id", ""),
valid_ids=valid_scene_ids,
field_label="礼金记录场景",
errors=errors,
)
if errors:
return {}, errors
return {
"phone": phone or None,
"attendance_status": attendance_status,
"actual_attendee_count": actual_attendee_count,
"child_count": child_count,
"red_packet_child_count": red_packet_child_count,
"total_gift_amount": total_gift_amount,
"gift_method_option_id": gift_method_option_id,
"gift_scene_option_id": gift_scene_option_id,
"favor_status": favor_status,
"candy_status": candy_status,
"child_red_packet_status": child_red_packet_status,
"note": form.get("note", "").strip() or None,
}, []
def parse_admin_form(
form: dict[str, str],
*,
raw_tag_option_ids: Sequence[str],
valid_relation_category_ids: set[int],
relation_detail_parent_map: dict[int, int | None],
valid_tag_ids: set[int],
valid_method_ids: set[int],
valid_scene_ids: set[int],
require_household_code: bool = True,
) -> tuple[dict[str, object], list[str]]:
errors: list[str] = []
household_code = form.get("household_code", "").strip()
if require_household_code and not household_code:
errors.append("户编码不能为空。")
head_name = form.get("head_name", "").strip()
if not head_name:
errors.append("户主姓名不能为空。")
phone = form.get("phone", "").strip()
if phone and not PHONE_PATTERN.fullmatch(phone):
errors.append("联系电话格式不正确,请仅输入数字、空格或常见连接符。")
side = form.get("side", "").strip()
if side not in SIDE_OPTIONS:
errors.append("所属侧不合法。")
invite_status = form.get("invite_status", "").strip()
if invite_status not in INVITE_STATUS_OPTIONS:
errors.append("邀请状态不合法。")
attendance_status = form.get("attendance_status", "").strip()
if attendance_status not in ATTENDANCE_STATUS_OPTIONS:
errors.append("到场状态不合法。")
favor_status = form.get("favor_status", "").strip()
if favor_status not in GIVEN_STATUS_OPTIONS:
errors.append("伴手礼状态不合法。")
candy_status = form.get("candy_status", "").strip()
if candy_status not in GIVEN_STATUS_OPTIONS:
errors.append("喜糖状态不合法。")
child_red_packet_status = form.get("child_red_packet_status", "").strip()
if child_red_packet_status not in CHILD_RED_PACKET_STATUS_OPTIONS:
errors.append("儿童红包状态不合法。")
expected_attendee_count = _parse_non_negative_int(form.get("expected_attendee_count", "0"), "预计人数", errors)
actual_attendee_count = _parse_non_negative_int(form.get("actual_attendee_count", "0"), "实际到场人数", errors)
child_count = _parse_non_negative_int(form.get("child_count", "0"), "儿童人数", errors)
red_packet_child_count = _parse_non_negative_int(form.get("red_packet_child_count", "0"), "需红包儿童人数", errors)
total_gift_amount = _parse_non_negative_decimal(form.get("total_gift_amount", "0"), "礼金金额", errors)
if red_packet_child_count is not None and child_count is not None and red_packet_child_count > child_count:
errors.append("需红包儿童人数不能大于儿童人数。")
relation_category_option_id = _parse_optional_option_id(
form.get("relation_category_option_id", ""),
valid_ids=valid_relation_category_ids,
field_label="关系分类",
errors=errors,
)
relation_detail_option_id: int | None = None
if relation_detail_parent_map:
relation_detail_option_id = _parse_optional_option_id(
form.get("relation_detail_option_id", ""),
valid_ids=set(relation_detail_parent_map),
field_label="具体关系",
errors=errors,
)
if relation_detail_option_id is not None:
expected_parent_id = relation_detail_parent_map.get(relation_detail_option_id)
if relation_category_option_id is None or expected_parent_id != relation_category_option_id:
errors.append("具体关系与关系分类不匹配。")
tag_option_ids = _parse_tag_option_ids(raw_tag_option_ids, valid_ids=valid_tag_ids, errors=errors)
gift_method_option_id = _parse_optional_option_id(
form.get("gift_method_option_id", ""),
valid_ids=valid_method_ids,
field_label="礼金方式",
errors=errors,
)
gift_scene_option_id = _parse_optional_option_id(
form.get("gift_scene_option_id", ""),
valid_ids=valid_scene_ids,
field_label="礼金记录场景",
errors=errors,
)
if errors:
return {}, errors
return {
"household_code": household_code or None,
"head_name": head_name,
"phone": phone or None,
"side": side,
"relation_category_option_id": relation_category_option_id,
"relation_detail_option_id": relation_detail_option_id,
"tag_option_ids_json": tag_option_ids,
"invite_status": invite_status,
"attendance_status": attendance_status,
"expected_attendee_count": expected_attendee_count,
"actual_attendee_count": actual_attendee_count,
"child_count": child_count,
"red_packet_child_count": red_packet_child_count,
"total_gift_amount": total_gift_amount,
"gift_method_option_id": gift_method_option_id,
"gift_scene_option_id": gift_scene_option_id,
"favor_status": favor_status,
"candy_status": candy_status,
"child_red_packet_status": child_red_packet_status,
"note": form.get("note", "").strip() or None,
}, []
def _parse_non_negative_int(raw_value: str, field_label: str, errors: list[str]) -> int | None:
value = raw_value.strip()
try:
parsed = int(value)
except ValueError:
errors.append(f"{field_label}必须是非负整数。")
return None
if parsed < 0:
errors.append(f"{field_label}必须是非负整数。")
return None
return parsed
def _parse_non_negative_decimal(raw_value: str, field_label: str, errors: list[str]) -> Decimal | None:
value = raw_value.strip()
try:
parsed = Decimal(value)
except InvalidOperation:
errors.append(f"{field_label}必须是合法金额。")
return None
if parsed < 0:
errors.append(f"{field_label}不能为负数。")
return None
return parsed.quantize(Decimal("0.01"))
def _parse_optional_option_id(raw_value: str, *, valid_ids: set[int], field_label: str, errors: list[str]) -> int | None:
value = raw_value.strip()
if not value:
return None
try:
parsed = int(value)
except ValueError:
errors.append(f"{field_label}选项不合法。")
return None
if parsed not in valid_ids:
errors.append(f"{field_label}选项不合法。")
return None
return parsed
def _parse_tag_option_ids(raw_values: Sequence[str], *, valid_ids: set[int], errors: list[str]) -> list[int]:
parsed_values: list[int] = []
for raw_value in raw_values:
value = raw_value.strip()
if not value:
continue
try:
parsed = int(value)
except ValueError:
errors.append("标签选项不合法。")
continue
if parsed not in valid_ids:
errors.append("标签选项不合法。")
continue
if parsed not in parsed_values:
parsed_values.append(parsed)
return sorted(parsed_values)
def normalize_search_term(search_term: str | None) -> str:
if not search_term:
return ""
return "".join(search_term.strip().lower().split())
def _normalize_text(value: str | None) -> str:
if not value:
return ""
return "".join(value.strip().lower().split())
def _name_search_tokens(name: str | None) -> tuple[str, str]:
normalized_name = _normalize_text(name)
if not normalized_name:
return "", ""
if Style is None:
fallback_tokens = lazy_pinyin(normalized_name)
fallback_value = "".join(fallback_tokens).lower()
return fallback_value, fallback_value
full_pinyin = "".join(lazy_pinyin(normalized_name, style=Style.NORMAL)).lower()
initials = "".join(lazy_pinyin(normalized_name, style=Style.FIRST_LETTER)).lower()
return full_pinyin, initials
def _household_matches_search(household: Household, normalized_term: str) -> bool:
direct_values = [
household.head_name,
household.note,
]
if any(normalized_term in _normalize_text(value) for value in direct_values):
return True
tag_label_map = {
option.id: option.option_label
for option in list_enabled_options("tag")
}
if any(
normalized_term in _normalize_text(tag_label_map.get(tag_id))
for tag_id in (household.tag_option_ids_json or [])
):
return True
full_pinyin, initials = _name_search_tokens(household.head_name)
if normalized_term in full_pinyin or normalized_term in initials:
return True
for member in household.members:
if normalized_term in _normalize_text(member.name):
return True
member_full_pinyin, member_initials = _name_search_tokens(member.name)
if normalized_term in member_full_pinyin or normalized_term in member_initials:
return True
return False
def _normalize_sort(sort_by: str | None, sort_order: str | None) -> tuple[str, str]:
normalized_sort_by = (sort_by or "updated_at").strip()
if normalized_sort_by not in SORT_FIELD_OPTIONS:
normalized_sort_by = "updated_at"
normalized_sort_order = (sort_order or "desc").strip()
if normalized_sort_order not in SORT_ORDER_OPTIONS:
normalized_sort_order = "desc"
return normalized_sort_by, normalized_sort_order
def _database_sort_covers(sort_by: str | None) -> bool:
normalized_sort_by, _ = _normalize_sort(sort_by, None)
return normalized_sort_by != "head_name"
def _apply_database_sort(
query: Select[tuple[Household]],
*,
sort_by: str | None,
sort_order: str | None,
) -> Select[tuple[Household]]:
normalized_sort_by, normalized_sort_order = _normalize_sort(sort_by, sort_order)
if normalized_sort_by == "head_name":
return query
if normalized_sort_by == "expected_attendee_count":
sort_column = Household.expected_attendee_count
elif normalized_sort_by == "actual_attendee_count":
sort_column = Household.actual_attendee_count
elif normalized_sort_by == "child_count":
sort_column = Household.child_count
elif normalized_sort_by == "total_gift_amount":
sort_column = Household.total_gift_amount
else:
sort_column = Household.updated_at
ordered_column = sort_column.asc() if normalized_sort_order == "asc" else sort_column.desc()
ordered_id = Household.id.asc() if normalized_sort_order == "asc" else Household.id.desc()
return query.order_by(ordered_column, ordered_id)
def _sort_households(households: Sequence[Household], *, sort_by: str | None, sort_order: str | None) -> list[Household]:
normalized_sort_by, normalized_sort_order = _normalize_sort(sort_by, sort_order)
reverse = normalized_sort_order == "desc"
def sort_key(household: Household) -> tuple[object, int]:
if normalized_sort_by == "head_name":
name_sort_key, _ = _name_search_tokens(household.head_name)
primary_value: object = name_sort_key or _normalize_text(household.head_name)
elif normalized_sort_by == "expected_attendee_count":
primary_value = household.expected_attendee_count
elif normalized_sort_by == "actual_attendee_count":
primary_value = household.actual_attendee_count
elif normalized_sort_by == "child_count":
primary_value = household.child_count
elif normalized_sort_by == "total_gift_amount":
primary_value = household.total_gift_amount
else:
primary_value = household.updated_at or datetime.min
return primary_value, household.id
return sorted(households, key=sort_key, reverse=reverse)
def slugify_option_label(label: str) -> str:
normalized = _normalize_text(label)
ascii_tokens = "".join(lazy_pinyin(normalized)).lower()
slug = re.sub(r"[^a-z0-9]+", "-", ascii_tokens).strip("-")
if slug:
return slug[:48]
fallback = re.sub(r"[^a-z0-9]+", "-", normalized).strip("-")
return (fallback or "tag")[:48]