You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
422 lines
19 KiB
422 lines
19 KiB
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
import io
|
|
|
|
import pytest
|
|
|
|
from app import create_app
|
|
from app.extensions import db
|
|
from app.models import Account, AuditLog, Household, OptionItem
|
|
|
|
|
|
@pytest.fixture
|
|
def app():
|
|
app = create_app("testing")
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
yield app
|
|
db.session.remove()
|
|
db.drop_all()
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
return app.test_client()
|
|
|
|
|
|
def create_account(*, username: str, password: str, role: str, display_name: str | None = None) -> Account:
|
|
account = Account(username=username, role=role, status="active", display_name=display_name)
|
|
account.set_password(password)
|
|
db.session.add(account)
|
|
db.session.commit()
|
|
return account
|
|
|
|
|
|
def create_option(*, option_group: str, option_code: str, option_label: str, parent_id: int | None = None) -> OptionItem:
|
|
option = OptionItem(
|
|
option_group=option_group,
|
|
option_code=option_code,
|
|
option_label=option_label,
|
|
parent_id=parent_id,
|
|
sort_order=10,
|
|
is_enabled=True,
|
|
is_system=True,
|
|
)
|
|
db.session.add(option)
|
|
db.session.commit()
|
|
return option
|
|
|
|
|
|
def create_household(**overrides: object) -> Household:
|
|
household = Household(
|
|
household_code=str(overrides.get("household_code", "H001")),
|
|
head_name=str(overrides.get("head_name", "王大爷")),
|
|
phone=str(overrides.get("phone", "13800001111")) if overrides.get("phone", "13800001111") is not None else None,
|
|
side=str(overrides.get("side", "groom_side")),
|
|
attendance_status=str(overrides.get("attendance_status", "pending")),
|
|
invite_status=str(overrides.get("invite_status", "invited")),
|
|
expected_attendee_count=int(str(overrides.get("expected_attendee_count", 3))),
|
|
actual_attendee_count=int(str(overrides.get("actual_attendee_count", 0))),
|
|
child_count=int(str(overrides.get("child_count", 0))),
|
|
red_packet_child_count=int(str(overrides.get("red_packet_child_count", 0))),
|
|
total_gift_amount=Decimal(str(overrides.get("total_gift_amount", "0.00"))),
|
|
favor_status=str(overrides.get("favor_status", "not_given")),
|
|
candy_status=str(overrides.get("candy_status", "not_given")),
|
|
child_red_packet_status=str(overrides.get("child_red_packet_status", "not_given")),
|
|
note=str(overrides.get("note", "")) or None,
|
|
gift_method_option_id=int(str(overrides.get("gift_method_option_id"))) if overrides.get("gift_method_option_id") is not None else None,
|
|
gift_scene_option_id=int(str(overrides.get("gift_scene_option_id"))) if overrides.get("gift_scene_option_id") is not None else None,
|
|
version=int(str(overrides.get("version", 1))),
|
|
)
|
|
db.session.add(household)
|
|
db.session.commit()
|
|
return household
|
|
|
|
|
|
def login(client, *, username: str, password: str):
|
|
return client.post(
|
|
"/auth/login",
|
|
data={"username": username, "password": password},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
|
|
def csv_file(content: str, *, filename: str = "households.csv") -> tuple[io.BytesIO, str]:
|
|
return io.BytesIO(content.encode("utf-8")), filename
|
|
|
|
|
|
def test_csv_template_download_returns_headers(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.get("/csv/households/template")
|
|
content = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert response.headers["Content-Type"].startswith("text/csv")
|
|
assert "attachment; filename=" in response.headers["Content-Disposition"]
|
|
assert "\ufeffhousehold_code,head_name,phone,side,invite_status" in content
|
|
|
|
|
|
def test_csv_export_all_returns_all_households(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="editor", password="EditorPass123!", role="editor")
|
|
create_household(household_code="A001", head_name="王大爷", note="电话确认")
|
|
create_household(household_code="B002", head_name="李阿姨", note="待回电")
|
|
|
|
login(client, username="editor", password="EditorPass123!")
|
|
response = client.get("/csv/households/export?scope=all")
|
|
content = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "A001,王大爷" in content
|
|
assert "B002,李阿姨" in content
|
|
|
|
|
|
def test_csv_export_all_excludes_soft_deleted_households(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="editor", password="EditorPass123!", role="editor")
|
|
active = create_household(household_code="A001", head_name="王大爷", note="电话确认")
|
|
deleted = create_household(household_code="B002", head_name="李阿姨", note="待回电")
|
|
deleted.deleted_at = datetime(2026, 4, 12, 8, 0)
|
|
db.session.commit()
|
|
|
|
login(client, username="editor", password="EditorPass123!")
|
|
response = client.get("/csv/households/export?scope=all")
|
|
content = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "A001,王大爷" in content
|
|
assert "B002,李阿姨" not in content
|
|
|
|
|
|
def test_csv_export_filtered_respects_existing_query_params(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="editor", password="EditorPass123!", role="editor")
|
|
create_household(household_code="A001", head_name="王大爷", side="groom_side", total_gift_amount="300.00")
|
|
create_household(household_code="B002", head_name="李阿姨", side="bride_side", total_gift_amount="800.00")
|
|
|
|
login(client, username="editor", password="EditorPass123!")
|
|
response = client.get("/csv/households/export?scope=filtered&side=bride_side&sort_by=total_gift_amount&sort_order=desc")
|
|
content = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "B002,李阿姨" in content
|
|
assert "A001,王大爷" not in content
|
|
|
|
|
|
def test_csv_import_preview_rejects_wrong_extension(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.post(
|
|
"/csv/households/import/preview",
|
|
data={"file": (io.BytesIO(b"not,csv"), "households.txt")},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=True,
|
|
)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "请上传 .csv 格式文件。" in page
|
|
|
|
|
|
def test_csv_import_preview_rejects_oversized_file(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
oversized_bytes = b"a" * (5 * 1024 * 1024 + 1)
|
|
response = client.post(
|
|
"/csv/households/import/preview",
|
|
data={"file": (io.BytesIO(oversized_bytes), "households.csv")},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=True,
|
|
)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "CSV 文件不能超过 5 MB。" in page
|
|
|
|
|
|
def test_csv_import_preview_reports_invalid_and_conflict_rows(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
create_household(household_code="A001", head_name="王大爷", side="groom_side", phone="13800001111")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
content = "\n".join(
|
|
[
|
|
"household_code,head_name,phone,side,invite_status,attendance_status,expected_attendee_count,actual_attendee_count,child_count,red_packet_child_count,total_gift_amount,gift_method_option_code,gift_scene_option_code,favor_status,candy_status,child_red_packet_status,note",
|
|
"A001,王大爷,13800001111,groom_side,invited,attending,3,3,0,0,200.00,,,given,given,not_given,按编码更新",
|
|
"C003,王大爷,13800001111,groom_side,invited,attending,3,3,0,0,188.00,,,given,given,not_given,疑似重复",
|
|
"D004,张叔叔,13900000000,groom_side,broken,attending,3,3,0,0,188.00,,,given,given,not_given,无效状态",
|
|
],
|
|
)
|
|
response = client.post(
|
|
"/csv/households/import/preview",
|
|
data={"file": csv_file(content)},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=True,
|
|
)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "导入预览" in page
|
|
assert "按户编码更新" in page
|
|
assert "检测到疑似冲突" in page
|
|
assert "邀请状态不合法。" in page
|
|
|
|
|
|
def test_csv_import_preview_rejects_household_code_used_by_deleted_household(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
deleted_household = create_household(household_code="A001", head_name="已删除户", side="groom_side", phone="13800001111")
|
|
deleted_household.deleted_at = datetime(2026, 4, 12, 9, 0)
|
|
db.session.commit()
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
content = "\n".join(
|
|
[
|
|
"household_code,head_name,phone,side,invite_status,attendance_status,expected_attendee_count,actual_attendee_count,child_count,red_packet_child_count,total_gift_amount,gift_method_option_code,gift_scene_option_code,favor_status,candy_status,child_red_packet_status,note",
|
|
"A001,新导入户,13900000000,groom_side,invited,pending,1,0,0,0,0.00,,,not_given,not_given,not_given,历史编号冲突",
|
|
],
|
|
)
|
|
response = client.post(
|
|
"/csv/households/import/preview",
|
|
data={"file": csv_file(content)},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=True,
|
|
)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "户编码已被历史删除记录占用,请改用新的户编码。" in page
|
|
|
|
|
|
def test_csv_confirm_skips_deleted_code_collision_and_imports_other_valid_rows(client, app) -> None:
|
|
with app.app_context():
|
|
admin = create_account(username="admin", password="AdminPass123!", role="admin")
|
|
deleted_household = create_household(household_code="A001", head_name="已删除户", side="groom_side", phone="13800001111")
|
|
deleted_household.deleted_at = datetime(2026, 4, 12, 9, 0)
|
|
db.session.commit()
|
|
admin_id = admin.id
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
content = "\n".join(
|
|
[
|
|
"household_code,head_name,phone,side,invite_status,attendance_status,expected_attendee_count,actual_attendee_count,child_count,red_packet_child_count,total_gift_amount,gift_method_option_code,gift_scene_option_code,favor_status,candy_status,child_red_packet_status,note",
|
|
"A001,历史冲突户,13900000000,groom_side,invited,pending,1,0,0,0,0.00,,,not_given,not_given,not_given,应判定无效",
|
|
"B002,有效新户,13900002222,bride_side,invited,pending,2,0,0,0,88.00,,,not_given,not_given,not_given,应被创建",
|
|
],
|
|
)
|
|
preview_response = client.post(
|
|
"/csv/households/import/preview",
|
|
data={"file": csv_file(content)},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=False,
|
|
)
|
|
assert preview_response.status_code == 302
|
|
preview_token = preview_response.headers["Location"].split("preview_token=")[-1]
|
|
|
|
confirm_response = client.post(
|
|
"/csv/households/import/confirm",
|
|
data={"preview_token": preview_token, "conflict_mode": "update_by_code"},
|
|
follow_redirects=False,
|
|
)
|
|
assert confirm_response.status_code == 302
|
|
assert confirm_response.headers["Location"].endswith("/")
|
|
|
|
with app.app_context():
|
|
created = db.session.execute(db.select(Household).where(Household.household_code == "B002")).scalar_one_or_none()
|
|
invalid_reused = db.session.execute(
|
|
db.select(Household).where(Household.head_name == "历史冲突户", Household.deleted_at.is_(None)),
|
|
).scalar_one_or_none()
|
|
assert created is not None
|
|
assert created.head_name == "有效新户"
|
|
assert created.created_by == admin_id
|
|
assert invalid_reused is None
|
|
|
|
audit_log = db.session.execute(
|
|
db.select(AuditLog).where(AuditLog.action_type == "import_households_csv"),
|
|
).scalar_one()
|
|
assert audit_log.after_data_json is not None
|
|
assert audit_log.after_data_json["rows_created"] == 1
|
|
assert audit_log.after_data_json["rows_invalid"] == 1
|
|
|
|
|
|
def test_csv_confirm_with_update_by_code_creates_and_updates_households(client, app) -> None:
|
|
with app.app_context():
|
|
admin = create_account(username="admin", password="AdminPass123!", role="admin")
|
|
gift_method = create_option(option_group="gift_method", option_code="cash", option_label="现金")
|
|
gift_scene = create_option(option_group="gift_scene", option_code="wedding_day", option_label="婚礼当天")
|
|
create_household(
|
|
household_code="A001",
|
|
head_name="王大爷",
|
|
side="groom_side",
|
|
phone="13800001111",
|
|
total_gift_amount="100.00",
|
|
)
|
|
admin_id = admin.id
|
|
gift_method_code = gift_method.option_code
|
|
gift_scene_code = gift_scene.option_code
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
content = "\n".join(
|
|
[
|
|
"household_code,head_name,phone,side,invite_status,attendance_status,expected_attendee_count,actual_attendee_count,child_count,red_packet_child_count,total_gift_amount,gift_method_option_code,gift_scene_option_code,favor_status,candy_status,child_red_packet_status,note",
|
|
f"A001,王大爷,13800001111,groom_side,confirmed,attending,4,4,1,1,520.00,{gift_method_code},{gift_scene_code},given,given,partial,更新后备注",
|
|
f"B002,李阿姨,13900002222,bride_side,invited,pending,2,0,0,0,88.00,{gift_method_code},{gift_scene_code},not_given,not_given,not_given,新建备注",
|
|
],
|
|
)
|
|
preview_response = client.post(
|
|
"/csv/households/import/preview",
|
|
data={"file": csv_file(content)},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=False,
|
|
)
|
|
assert preview_response.status_code == 302
|
|
preview_location = preview_response.headers["Location"]
|
|
preview_token = preview_location.split("preview_token=")[-1]
|
|
|
|
confirm_response = client.post(
|
|
"/csv/households/import/confirm",
|
|
data={"preview_token": preview_token, "conflict_mode": "update_by_code"},
|
|
follow_redirects=False,
|
|
)
|
|
assert confirm_response.status_code == 302
|
|
assert confirm_response.headers["Location"].endswith("/")
|
|
|
|
with app.app_context():
|
|
updated = db.session.execute(db.select(Household).where(Household.household_code == "A001")).scalar_one()
|
|
created = db.session.execute(db.select(Household).where(Household.household_code == "B002")).scalar_one()
|
|
assert updated.total_gift_amount == Decimal("520.00")
|
|
assert updated.note == "更新后备注"
|
|
assert updated.updated_by == admin_id
|
|
assert created.head_name == "李阿姨"
|
|
assert created.note == "新建备注"
|
|
assert created.created_by == admin_id
|
|
|
|
audit_log = db.session.execute(
|
|
db.select(AuditLog).where(AuditLog.action_type == "import_households_csv"),
|
|
).scalar_one()
|
|
assert audit_log.after_data_json is not None
|
|
assert audit_log.after_data_json["rows_created"] == 1
|
|
assert audit_log.after_data_json["rows_updated"] == 1
|
|
assert audit_log.after_data_json["conflict_mode"] == "update_by_code"
|
|
|
|
|
|
def test_csv_confirm_with_skip_conflicts_only_creates_new_rows(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
create_household(
|
|
household_code="A001",
|
|
head_name="王大爷",
|
|
side="groom_side",
|
|
phone="13800001111",
|
|
total_gift_amount="100.00",
|
|
note="原备注",
|
|
)
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
content = "\n".join(
|
|
[
|
|
"household_code,head_name,phone,side,invite_status,attendance_status,expected_attendee_count,actual_attendee_count,child_count,red_packet_child_count,total_gift_amount,gift_method_option_code,gift_scene_option_code,favor_status,candy_status,child_red_packet_status,note",
|
|
"A001,王大爷,13800001111,groom_side,confirmed,attending,4,4,1,1,520.00,,,given,given,partial,应被跳过",
|
|
"B002,李阿姨,13900002222,bride_side,invited,pending,2,0,0,0,88.00,,,not_given,not_given,not_given,应被创建",
|
|
],
|
|
)
|
|
preview_response = client.post(
|
|
"/csv/households/import/preview",
|
|
data={"file": csv_file(content)},
|
|
content_type="multipart/form-data",
|
|
follow_redirects=False,
|
|
)
|
|
assert preview_response.status_code == 302
|
|
preview_token = preview_response.headers["Location"].split("preview_token=")[-1]
|
|
|
|
confirm_response = client.post(
|
|
"/csv/households/import/confirm",
|
|
data={"preview_token": preview_token, "conflict_mode": "skip_conflicts"},
|
|
follow_redirects=False,
|
|
)
|
|
assert confirm_response.status_code == 302
|
|
assert confirm_response.headers["Location"].endswith("/")
|
|
|
|
with app.app_context():
|
|
existing = db.session.execute(db.select(Household).where(Household.household_code == "A001")).scalar_one()
|
|
created = db.session.execute(db.select(Household).where(Household.household_code == "B002")).scalar_one_or_none()
|
|
assert existing.total_gift_amount == Decimal("100.00")
|
|
assert existing.note == "原备注"
|
|
assert created is not None
|
|
assert created.note == "应被创建"
|
|
|
|
audit_log = db.session.execute(
|
|
db.select(AuditLog).where(AuditLog.action_type == "import_households_csv"),
|
|
).scalar_one()
|
|
assert audit_log.after_data_json is not None
|
|
assert audit_log.after_data_json["rows_created"] == 1
|
|
assert audit_log.after_data_json["rows_updated"] == 0
|
|
assert audit_log.after_data_json["rows_skipped"] == 1
|
|
assert audit_log.after_data_json["conflict_mode"] == "skip_conflicts"
|
|
|
|
|
|
def test_csv_confirm_with_missing_preview_fails_safely(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.post(
|
|
"/csv/households/import/confirm",
|
|
data={"preview_token": "missing-preview-token", "conflict_mode": "skip_conflicts"},
|
|
follow_redirects=True,
|
|
)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "导入预览已失效,请重新上传 CSV。" in page
|
|
|