from __future__ import annotations from collections.abc import Sequence from dataclasses import dataclass from datetime import datetime from decimal import Decimal, InvalidOperation import re from sqlalchemy import Select, func from sqlalchemy.orm import selectinload try: from pypinyin import Style, lazy_pinyin except ImportError: # pragma: no cover - dependency is installed in runtime verification Style = None def _lazy_pinyin_fallback(value: str, style: object | None = None) -> list[str]: _ = style return [value] lazy_pinyin = _lazy_pinyin_fallback from app.extensions import db from app.models import Household, OptionItem ATTENDANCE_STATUS_OPTIONS = ("pending", "attending", "absent", "partial") INVITE_STATUS_OPTIONS = ("not_invited", "invited", "confirmed", "declined") GIVEN_STATUS_OPTIONS = ("not_given", "given") CHILD_RED_PACKET_STATUS_OPTIONS = ("not_given", "partial", "given") SIDE_OPTIONS = ("groom_side", "bride_side", "both_side") SORT_FIELD_OPTIONS = ( "updated_at", "head_name", "expected_attendee_count", "actual_attendee_count", "child_count", "total_gift_amount", ) SORT_ORDER_OPTIONS = ("asc", "desc") HOMEPAGE_PER_PAGE_OPTIONS = (10, 20, 50) DEFAULT_HOMEPAGE_PER_PAGE = 10 SIDE_LABELS = { "groom_side": "男方", "bride_side": "女方", "both_side": "双方共有", } INVITE_STATUS_LABELS = { "not_invited": "未邀请", "invited": "已邀请", "confirmed": "已确认", "declined": "已婉拒", } ATTENDANCE_STATUS_LABELS = { "pending": "待确认", "attending": "到场", "absent": "缺席", "partial": "部分到场", } GIVEN_STATUS_LABELS = { "not_given": "未发", "given": "已发", } CHILD_RED_PACKET_STATUS_LABELS = { "not_given": "未发", "partial": "部分发放", "given": "已发", } HOUSEHOLD_VALUE_LABELS = { "side": SIDE_LABELS, "invite_status": INVITE_STATUS_LABELS, "attendance_status": ATTENDANCE_STATUS_LABELS, "favor_status": GIVEN_STATUS_LABELS, "candy_status": GIVEN_STATUS_LABELS, "child_red_packet_status": CHILD_RED_PACKET_STATUS_LABELS, } SORT_FIELD_LABELS = { "updated_at": "最近更新时间", "head_name": "户主姓名", "expected_attendee_count": "预计人数", "actual_attendee_count": "实际到场人数", "child_count": "儿童人数", "total_gift_amount": "礼金金额", } ADMIN_EDITABLE_FIELDS = ( "household_code", "head_name", "phone", "side", "relation_category_option_id", "relation_detail_option_id", "tag_option_ids_json", "invite_status", "attendance_status", "expected_attendee_count", "actual_attendee_count", "child_count", "red_packet_child_count", "total_gift_amount", "gift_method_option_id", "gift_scene_option_id", "favor_status", "candy_status", "child_red_packet_status", "note", ) ENTRY_EDITABLE_FIELDS = ( "head_name", "phone", "attendance_status", "actual_attendee_count", "child_count", "red_packet_child_count", "total_gift_amount", "gift_method_option_id", "gift_scene_option_id", "favor_status", "candy_status", "child_red_packet_status", "note", ) PHONE_PATTERN = re.compile(r"^[0-9+\-\s]{6,32}$") @dataclass(frozen=True) class HouseholdHomepagePage: items: Sequence[Household] filtered_households: Sequence[Household] filtered_count: int page: int per_page: int total_pages: int has_prev: bool has_next: bool page_start: int page_end: int def household_query( _search_term: str | None = None, *, side: str | None = None, invite_status: str | None = None, attendance_status: str | None = None, tag_option_id: int | None = None, sort_by: str | None = None, sort_order: str | None = None, ) -> Select[tuple[Household]]: query = ( db.select(Household) .where(Household.deleted_at.is_(None)) .options( selectinload(Household.relation_category_option), selectinload(Household.relation_detail_option), selectinload(Household.gift_method_option), selectinload(Household.gift_scene_option), selectinload(Household.members), selectinload(Household.updated_by_account), ) ) normalized_side = (side or "").strip() if normalized_side in SIDE_OPTIONS: query = query.where(Household.side == normalized_side) normalized_invite_status = (invite_status or "").strip() if normalized_invite_status in INVITE_STATUS_OPTIONS: query = query.where(Household.invite_status == normalized_invite_status) normalized_attendance_status = (attendance_status or "").strip() if normalized_attendance_status in ATTENDANCE_STATUS_OPTIONS: query = query.where(Household.attendance_status == normalized_attendance_status) if tag_option_id is not None: query = query.where(Household.tag_option_ids_json.is_not(None)) query = _apply_database_sort(query, sort_by=sort_by, sort_order=sort_order) return query def list_households( *, search_term: str | None = None, side: str | None = None, invite_status: str | None = None, attendance_status: str | None = None, tag_option_id: int | None = None, sort_by: str | None = None, sort_order: str | None = None, limit: int | None = 50, ) -> Sequence[Household]: result = db.session.execute( household_query( search_term, side=side, invite_status=invite_status, attendance_status=attendance_status, tag_option_id=tag_option_id, sort_by=sort_by, sort_order=sort_order, ), ) households = result.scalars().all() normalized_term = normalize_search_term(search_term) if normalized_term: households = [household for household in households if _household_matches_search(household, normalized_term)] if tag_option_id is not None: households = [household for household in households if tag_option_id in (household.tag_option_ids_json or [])] if not _database_sort_covers(sort_by): households = _sort_households(households, sort_by=sort_by, sort_order=sort_order) if limit is None: return households return households[:limit] def paginate_households_for_homepage( *, search_term: str | None = None, side: str | None = None, invite_status: str | None = None, attendance_status: str | None = None, tag_option_id: int | None = None, sort_by: str | None = None, sort_order: str | None = None, page: int = 1, per_page: int = DEFAULT_HOMEPAGE_PER_PAGE, ) -> HouseholdHomepagePage: normalized_per_page = per_page if per_page in HOMEPAGE_PER_PAGE_OPTIONS else DEFAULT_HOMEPAGE_PER_PAGE filtered_households = list_households( search_term=search_term, side=side, invite_status=invite_status, attendance_status=attendance_status, tag_option_id=tag_option_id, sort_by=sort_by, sort_order=sort_order, limit=None, ) filtered_count = len(filtered_households) total_pages = max(1, (filtered_count + normalized_per_page - 1) // normalized_per_page) normalized_page = min(max(page, 1), total_pages) start_index = (normalized_page - 1) * normalized_per_page end_index = start_index + normalized_per_page page_items = filtered_households[start_index:end_index] page_start = start_index + 1 if filtered_count else 0 page_end = min(end_index, filtered_count) if filtered_count else 0 return HouseholdHomepagePage( items=page_items, filtered_households=filtered_households, filtered_count=filtered_count, page=normalized_page, per_page=normalized_per_page, total_pages=total_pages, has_prev=normalized_page > 1, has_next=normalized_page < total_pages, page_start=page_start, page_end=page_end, ) def get_household_or_none(household_id: int) -> Household | None: result = db.session.execute( household_query().where(Household.id == household_id), ) return result.scalar_one_or_none() def get_homepage_stats() -> dict[str, int | Decimal]: summary = db.session.execute( db.select( func.count(Household.id), func.coalesce(func.sum(Household.expected_attendee_count), 0), func.coalesce(func.sum(Household.child_count), 0), func.coalesce(func.sum(Household.total_gift_amount), Decimal("0.00")), ).where(Household.deleted_at.is_(None)), ).one() return { "total_households": int(summary[0] or 0), "total_expected_attendees": int(summary[1] or 0), "total_children": int(summary[2] or 0), "total_gift_amount": summary[3] or Decimal("0.00"), } def get_filtered_homepage_stats(households: Sequence[Household]) -> dict[str, int | Decimal]: total_expected_attendees = sum(household.expected_attendee_count for household in households) total_children = sum(household.child_count for household in households) total_gift_amount = sum((household.total_gift_amount for household in households), start=Decimal("0.00")) total_attending_households = sum(1 for household in households if household.attendance_status in {"attending", "partial"}) return { "total_households": len(households), "total_expected_attendees": total_expected_attendees, "total_children": total_children, "total_gift_amount": total_gift_amount, "total_attending_households": total_attending_households, } def list_enabled_options(option_group: str) -> Sequence[OptionItem]: result = db.session.execute( db.select(OptionItem) .where( OptionItem.option_group == option_group, OptionItem.is_enabled.is_(True), ) .order_by(OptionItem.sort_order.asc(), OptionItem.id.asc()), ) return result.scalars().all() def parse_new_tag_labels(raw_value: str | None) -> tuple[list[str], list[str]]: if not raw_value: return [], [] normalized_labels: list[str] = [] errors: list[str] = [] for chunk in re.split(r"[\n,,]+", raw_value): label = chunk.strip() if not label: continue if len(label) > 32: errors.append(f"标签“{label[:12]}...”过长,请控制在 32 个字符内。") continue if label not in normalized_labels: normalized_labels.append(label) return normalized_labels, errors def parse_new_relation_category_label(raw_value: str | None) -> tuple[str | None, list[str]]: if not raw_value: return None, [] labels, errors = parse_new_tag_labels(raw_value) if len(labels) > 1: errors.append("一次只能新增一个关系分类。") return None, errors return (labels[0] if labels else None), errors def ensure_tag_options(labels: Sequence[str], *, actor_id: int | None = None) -> list[OptionItem]: if not labels: return [] existing_options = list_enabled_options("tag") existing_by_label = {option.option_label: option for option in existing_options} sort_order_seed = max((option.sort_order for option in existing_options), default=0) ensured: list[OptionItem] = [] for index, label in enumerate(labels, start=1): existing = existing_by_label.get(label) if existing is not None: ensured.append(existing) continue option_code = slugify_option_label(label) candidate_code = option_code suffix = 2 while db.session.execute( db.select(OptionItem.id).where( OptionItem.option_group == "tag", OptionItem.option_code == candidate_code, ).limit(1) ).scalar_one_or_none() is not None: candidate_code = f"{option_code}-{suffix}" suffix += 1 option = OptionItem( option_group="tag", option_code=candidate_code, option_label=label, sort_order=sort_order_seed + index, is_enabled=True, is_system=False, created_by=actor_id, updated_by=actor_id, ) db.session.add(option) db.session.flush() existing_by_label[label] = option ensured.append(option) return ensured def ensure_relation_category_option(label: str | None, *, actor_id: int | None = None) -> OptionItem | None: if not label: return None existing_options = list_enabled_options("relation_category") for option in existing_options: if option.option_label == label: return option option_code = slugify_option_label(label) candidate_code = option_code suffix = 2 while db.session.execute( db.select(OptionItem.id).where( OptionItem.option_group == "relation_category", OptionItem.option_code == candidate_code, ).limit(1) ).scalar_one_or_none() is not None: candidate_code = f"{option_code}-{suffix}" suffix += 1 sort_order_seed = max((option.sort_order for option in existing_options), default=0) option = OptionItem( option_group="relation_category", option_code=candidate_code, option_label=label, sort_order=sort_order_seed + 10, is_enabled=True, is_system=False, created_by=actor_id, updated_by=actor_id, ) db.session.add(option) db.session.flush() return option def mask_phone(phone: str | None) -> str: if not phone: return "未登记" digits_only = "".join(character for character in phone if character.isdigit()) if len(digits_only) >= 7: return f"{digits_only[:3]}****{digits_only[-4:]}" if len(phone) <= 4: return phone return f"{phone[:2]}***{phone[-2:]}" def household_value_label(field_name: str, value: str | None) -> str: if value is None: return "-" normalized_value = value.strip() if not normalized_value: return "-" return HOUSEHOLD_VALUE_LABELS.get(field_name, {}).get(normalized_value, normalized_value) def suggest_next_household_code() -> str: result = db.session.execute( db.select(Household.household_code), ) max_numeric_suffix = 0 for household_code in result.scalars().all(): if not household_code: continue digits = "".join(character for character in household_code if character.isdigit()) if not digits: continue max_numeric_suffix = max(max_numeric_suffix, int(digits)) return f"H{max_numeric_suffix + 1:03d}" def build_new_household_draft(*, household_code: str | None = None) -> Household: household = Household() household.household_code = household_code or suggest_next_household_code() household.head_name = "" household.phone = None household.side = "groom_side" household.invite_status = "not_invited" household.attendance_status = "pending" household.expected_attendee_count = 0 household.actual_attendee_count = 0 household.child_count = 0 household.red_packet_child_count = 0 household.total_gift_amount = Decimal("0.00") household.favor_status = "not_given" household.candy_status = "not_given" household.child_red_packet_status = "not_given" household.tag_option_ids_json = [] household.note = None household.version = 1 return household def is_household_code_taken(household_code: str, *, excluding_household_id: int | None = None) -> bool: query = db.select(Household.id).where( Household.household_code == household_code, ) if excluding_household_id is not None: query = query.where(Household.id != excluding_household_id) return db.session.execute(query.limit(1)).scalar_one_or_none() is not None def is_head_name_taken(head_name: str, *, excluding_household_id: int | None = None) -> bool: query = db.select(Household.id).where( Household.deleted_at.is_(None), Household.head_name == head_name, ) if excluding_household_id is not None: query = query.where(Household.id != excluding_household_id) return db.session.execute(query.limit(1)).scalar_one_or_none() is not None def serialize_admin_edit_snapshot(household: Household) -> dict[str, object]: return { "household_code": household.household_code, "head_name": household.head_name, "phone": household.phone, "side": household.side, "relation_category_option_id": household.relation_category_option_id, "relation_detail_option_id": household.relation_detail_option_id, "tag_option_ids_json": sorted(household.tag_option_ids_json or []), "invite_status": household.invite_status, "attendance_status": household.attendance_status, "expected_attendee_count": household.expected_attendee_count, "actual_attendee_count": household.actual_attendee_count, "child_count": household.child_count, "red_packet_child_count": household.red_packet_child_count, "total_gift_amount": str(household.total_gift_amount), "gift_method_option_id": household.gift_method_option_id, "gift_scene_option_id": household.gift_scene_option_id, "favor_status": household.favor_status, "candy_status": household.candy_status, "child_red_packet_status": household.child_red_packet_status, "note": household.note, "deleted_at": household.deleted_at.isoformat(sep=" ") if household.deleted_at is not None else None, "version": household.version, } def serialize_entry_edit_snapshot(household: Household) -> dict[str, object]: return { "head_name": household.head_name, "phone": household.phone, "attendance_status": household.attendance_status, "actual_attendee_count": household.actual_attendee_count, "child_count": household.child_count, "red_packet_child_count": household.red_packet_child_count, "total_gift_amount": str(household.total_gift_amount), "gift_method_option_id": household.gift_method_option_id, "gift_scene_option_id": household.gift_scene_option_id, "favor_status": household.favor_status, "candy_status": household.candy_status, "child_red_packet_status": household.child_red_packet_status, "note": household.note, "version": household.version, } def parse_entry_form(form: dict[str, str], *, valid_method_ids: set[int], valid_scene_ids: set[int]) -> tuple[dict[str, object], list[str]]: errors: list[str] = [] phone = form.get("phone", "").strip() if phone and not PHONE_PATTERN.fullmatch(phone): errors.append("联系电话格式不正确,请仅输入数字、空格或常见连接符。") attendance_status = form.get("attendance_status", "").strip() if attendance_status not in ATTENDANCE_STATUS_OPTIONS: errors.append("到场状态不合法。") favor_status = form.get("favor_status", "").strip() if favor_status not in GIVEN_STATUS_OPTIONS: errors.append("伴手礼状态不合法。") candy_status = form.get("candy_status", "").strip() if candy_status not in GIVEN_STATUS_OPTIONS: errors.append("喜糖状态不合法。") child_red_packet_status = form.get("child_red_packet_status", "").strip() if child_red_packet_status not in CHILD_RED_PACKET_STATUS_OPTIONS: errors.append("儿童红包状态不合法。") actual_attendee_count = _parse_non_negative_int(form.get("actual_attendee_count", "0"), "实际到场人数", errors) child_count = _parse_non_negative_int(form.get("child_count", "0"), "儿童人数", errors) red_packet_child_count = _parse_non_negative_int(form.get("red_packet_child_count", "0"), "需红包儿童人数", errors) total_gift_amount = _parse_non_negative_decimal(form.get("total_gift_amount", "0"), "礼金金额", errors) if red_packet_child_count is not None and child_count is not None and red_packet_child_count > child_count: errors.append("需红包儿童人数不能大于儿童人数。") gift_method_option_id = _parse_optional_option_id( form.get("gift_method_option_id", ""), valid_ids=valid_method_ids, field_label="礼金方式", errors=errors, ) gift_scene_option_id = _parse_optional_option_id( form.get("gift_scene_option_id", ""), valid_ids=valid_scene_ids, field_label="礼金记录场景", errors=errors, ) if errors: return {}, errors return { "phone": phone or None, "attendance_status": attendance_status, "actual_attendee_count": actual_attendee_count, "child_count": child_count, "red_packet_child_count": red_packet_child_count, "total_gift_amount": total_gift_amount, "gift_method_option_id": gift_method_option_id, "gift_scene_option_id": gift_scene_option_id, "favor_status": favor_status, "candy_status": candy_status, "child_red_packet_status": child_red_packet_status, "note": form.get("note", "").strip() or None, }, [] def parse_admin_form( form: dict[str, str], *, raw_tag_option_ids: Sequence[str], valid_relation_category_ids: set[int], relation_detail_parent_map: dict[int, int | None], valid_tag_ids: set[int], valid_method_ids: set[int], valid_scene_ids: set[int], require_household_code: bool = True, ) -> tuple[dict[str, object], list[str]]: errors: list[str] = [] household_code = form.get("household_code", "").strip() if require_household_code and not household_code: errors.append("户编码不能为空。") head_name = form.get("head_name", "").strip() if not head_name: errors.append("户主姓名不能为空。") phone = form.get("phone", "").strip() if phone and not PHONE_PATTERN.fullmatch(phone): errors.append("联系电话格式不正确,请仅输入数字、空格或常见连接符。") side = form.get("side", "").strip() if side not in SIDE_OPTIONS: errors.append("所属侧不合法。") invite_status = form.get("invite_status", "").strip() if invite_status not in INVITE_STATUS_OPTIONS: errors.append("邀请状态不合法。") attendance_status = form.get("attendance_status", "").strip() if attendance_status not in ATTENDANCE_STATUS_OPTIONS: errors.append("到场状态不合法。") favor_status = form.get("favor_status", "").strip() if favor_status not in GIVEN_STATUS_OPTIONS: errors.append("伴手礼状态不合法。") candy_status = form.get("candy_status", "").strip() if candy_status not in GIVEN_STATUS_OPTIONS: errors.append("喜糖状态不合法。") child_red_packet_status = form.get("child_red_packet_status", "").strip() if child_red_packet_status not in CHILD_RED_PACKET_STATUS_OPTIONS: errors.append("儿童红包状态不合法。") expected_attendee_count = _parse_non_negative_int(form.get("expected_attendee_count", "0"), "预计人数", errors) actual_attendee_count = _parse_non_negative_int(form.get("actual_attendee_count", "0"), "实际到场人数", errors) child_count = _parse_non_negative_int(form.get("child_count", "0"), "儿童人数", errors) red_packet_child_count = _parse_non_negative_int(form.get("red_packet_child_count", "0"), "需红包儿童人数", errors) total_gift_amount = _parse_non_negative_decimal(form.get("total_gift_amount", "0"), "礼金金额", errors) if red_packet_child_count is not None and child_count is not None and red_packet_child_count > child_count: errors.append("需红包儿童人数不能大于儿童人数。") relation_category_option_id = _parse_optional_option_id( form.get("relation_category_option_id", ""), valid_ids=valid_relation_category_ids, field_label="关系分类", errors=errors, ) relation_detail_option_id: int | None = None if relation_detail_parent_map: relation_detail_option_id = _parse_optional_option_id( form.get("relation_detail_option_id", ""), valid_ids=set(relation_detail_parent_map), field_label="具体关系", errors=errors, ) if relation_detail_option_id is not None: expected_parent_id = relation_detail_parent_map.get(relation_detail_option_id) if relation_category_option_id is None or expected_parent_id != relation_category_option_id: errors.append("具体关系与关系分类不匹配。") tag_option_ids = _parse_tag_option_ids(raw_tag_option_ids, valid_ids=valid_tag_ids, errors=errors) gift_method_option_id = _parse_optional_option_id( form.get("gift_method_option_id", ""), valid_ids=valid_method_ids, field_label="礼金方式", errors=errors, ) gift_scene_option_id = _parse_optional_option_id( form.get("gift_scene_option_id", ""), valid_ids=valid_scene_ids, field_label="礼金记录场景", errors=errors, ) if errors: return {}, errors return { "household_code": household_code or None, "head_name": head_name, "phone": phone or None, "side": side, "relation_category_option_id": relation_category_option_id, "relation_detail_option_id": relation_detail_option_id, "tag_option_ids_json": tag_option_ids, "invite_status": invite_status, "attendance_status": attendance_status, "expected_attendee_count": expected_attendee_count, "actual_attendee_count": actual_attendee_count, "child_count": child_count, "red_packet_child_count": red_packet_child_count, "total_gift_amount": total_gift_amount, "gift_method_option_id": gift_method_option_id, "gift_scene_option_id": gift_scene_option_id, "favor_status": favor_status, "candy_status": candy_status, "child_red_packet_status": child_red_packet_status, "note": form.get("note", "").strip() or None, }, [] def _parse_non_negative_int(raw_value: str, field_label: str, errors: list[str]) -> int | None: value = raw_value.strip() try: parsed = int(value) except ValueError: errors.append(f"{field_label}必须是非负整数。") return None if parsed < 0: errors.append(f"{field_label}必须是非负整数。") return None return parsed def _parse_non_negative_decimal(raw_value: str, field_label: str, errors: list[str]) -> Decimal | None: value = raw_value.strip() try: parsed = Decimal(value) except InvalidOperation: errors.append(f"{field_label}必须是合法金额。") return None if parsed < 0: errors.append(f"{field_label}不能为负数。") return None return parsed.quantize(Decimal("0.01")) def _parse_optional_option_id(raw_value: str, *, valid_ids: set[int], field_label: str, errors: list[str]) -> int | None: value = raw_value.strip() if not value: return None try: parsed = int(value) except ValueError: errors.append(f"{field_label}选项不合法。") return None if parsed not in valid_ids: errors.append(f"{field_label}选项不合法。") return None return parsed def _parse_tag_option_ids(raw_values: Sequence[str], *, valid_ids: set[int], errors: list[str]) -> list[int]: parsed_values: list[int] = [] for raw_value in raw_values: value = raw_value.strip() if not value: continue try: parsed = int(value) except ValueError: errors.append("标签选项不合法。") continue if parsed not in valid_ids: errors.append("标签选项不合法。") continue if parsed not in parsed_values: parsed_values.append(parsed) return sorted(parsed_values) def normalize_search_term(search_term: str | None) -> str: if not search_term: return "" return "".join(search_term.strip().lower().split()) def _normalize_text(value: str | None) -> str: if not value: return "" return "".join(value.strip().lower().split()) def _name_search_tokens(name: str | None) -> tuple[str, str]: normalized_name = _normalize_text(name) if not normalized_name: return "", "" if Style is None: fallback_tokens = lazy_pinyin(normalized_name) fallback_value = "".join(fallback_tokens).lower() return fallback_value, fallback_value full_pinyin = "".join(lazy_pinyin(normalized_name, style=Style.NORMAL)).lower() initials = "".join(lazy_pinyin(normalized_name, style=Style.FIRST_LETTER)).lower() return full_pinyin, initials def _household_matches_search(household: Household, normalized_term: str) -> bool: direct_values = [ household.head_name, household.note, ] if any(normalized_term in _normalize_text(value) for value in direct_values): return True tag_label_map = { option.id: option.option_label for option in list_enabled_options("tag") } if any( normalized_term in _normalize_text(tag_label_map.get(tag_id)) for tag_id in (household.tag_option_ids_json or []) ): return True full_pinyin, initials = _name_search_tokens(household.head_name) if normalized_term in full_pinyin or normalized_term in initials: return True for member in household.members: if normalized_term in _normalize_text(member.name): return True member_full_pinyin, member_initials = _name_search_tokens(member.name) if normalized_term in member_full_pinyin or normalized_term in member_initials: return True return False def _normalize_sort(sort_by: str | None, sort_order: str | None) -> tuple[str, str]: normalized_sort_by = (sort_by or "updated_at").strip() if normalized_sort_by not in SORT_FIELD_OPTIONS: normalized_sort_by = "updated_at" normalized_sort_order = (sort_order or "desc").strip() if normalized_sort_order not in SORT_ORDER_OPTIONS: normalized_sort_order = "desc" return normalized_sort_by, normalized_sort_order def _database_sort_covers(sort_by: str | None) -> bool: normalized_sort_by, _ = _normalize_sort(sort_by, None) return normalized_sort_by != "head_name" def _apply_database_sort( query: Select[tuple[Household]], *, sort_by: str | None, sort_order: str | None, ) -> Select[tuple[Household]]: normalized_sort_by, normalized_sort_order = _normalize_sort(sort_by, sort_order) if normalized_sort_by == "head_name": return query if normalized_sort_by == "expected_attendee_count": sort_column = Household.expected_attendee_count elif normalized_sort_by == "actual_attendee_count": sort_column = Household.actual_attendee_count elif normalized_sort_by == "child_count": sort_column = Household.child_count elif normalized_sort_by == "total_gift_amount": sort_column = Household.total_gift_amount else: sort_column = Household.updated_at ordered_column = sort_column.asc() if normalized_sort_order == "asc" else sort_column.desc() ordered_id = Household.id.asc() if normalized_sort_order == "asc" else Household.id.desc() return query.order_by(ordered_column, ordered_id) def _sort_households(households: Sequence[Household], *, sort_by: str | None, sort_order: str | None) -> list[Household]: normalized_sort_by, normalized_sort_order = _normalize_sort(sort_by, sort_order) reverse = normalized_sort_order == "desc" def sort_key(household: Household) -> tuple[object, int]: if normalized_sort_by == "head_name": name_sort_key, _ = _name_search_tokens(household.head_name) primary_value: object = name_sort_key or _normalize_text(household.head_name) elif normalized_sort_by == "expected_attendee_count": primary_value = household.expected_attendee_count elif normalized_sort_by == "actual_attendee_count": primary_value = household.actual_attendee_count elif normalized_sort_by == "child_count": primary_value = household.child_count elif normalized_sort_by == "total_gift_amount": primary_value = household.total_gift_amount else: primary_value = household.updated_at or datetime.min return primary_value, household.id return sorted(households, key=sort_key, reverse=reverse) def slugify_option_label(label: str) -> str: normalized = _normalize_text(label) ascii_tokens = "".join(lazy_pinyin(normalized)).lower() slug = re.sub(r"[^a-z0-9]+", "-", ascii_tokens).strip("-") if slug: return slug[:48] fallback = re.sub(r"[^a-z0-9]+", "-", normalized).strip("-") return (fallback or "tag")[:48]