You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
320 lines
12 KiB
320 lines
12 KiB
from __future__ import annotations
|
|
|
|
from decimal import Decimal
|
|
|
|
import pytest
|
|
|
|
from app import create_app
|
|
from app.extensions import db
|
|
from app.models import Account, AuditLog, Household, OptionItem
|
|
|
|
|
|
@pytest.fixture
|
|
def app():
|
|
app = create_app("testing")
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
yield app
|
|
db.session.remove()
|
|
db.drop_all()
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
return app.test_client()
|
|
|
|
|
|
def create_account(*, username: str, password: str, role: str, status: str = "active", display_name: str | None = None) -> Account:
|
|
account = Account(username=username, role=role, status=status, display_name=display_name)
|
|
account.set_password(password)
|
|
db.session.add(account)
|
|
db.session.commit()
|
|
return account
|
|
|
|
|
|
def create_option(*, option_group: str, option_code: str, option_label: str, parent_id: int | None = None) -> OptionItem:
|
|
option = OptionItem(
|
|
option_group=option_group,
|
|
option_code=option_code,
|
|
option_label=option_label,
|
|
parent_id=parent_id,
|
|
sort_order=10,
|
|
is_enabled=True,
|
|
is_system=True,
|
|
)
|
|
db.session.add(option)
|
|
db.session.commit()
|
|
return option
|
|
|
|
|
|
def create_household(**overrides: object) -> Household:
|
|
household = Household(
|
|
household_code=str(overrides.get("household_code", "H001")),
|
|
head_name=str(overrides.get("head_name", "王大爷")),
|
|
phone=str(overrides.get("phone", "13800001111")),
|
|
side=str(overrides.get("side", "groom_side")),
|
|
attendance_status=str(overrides.get("attendance_status", "pending")),
|
|
invite_status=str(overrides.get("invite_status", "invited")),
|
|
expected_attendee_count=int(str(overrides.get("expected_attendee_count", 3))),
|
|
actual_attendee_count=int(str(overrides.get("actual_attendee_count", 0))),
|
|
child_count=int(str(overrides.get("child_count", 0))),
|
|
red_packet_child_count=int(str(overrides.get("red_packet_child_count", 0))),
|
|
total_gift_amount=Decimal(str(overrides.get("total_gift_amount", "0.00"))),
|
|
favor_status=str(overrides.get("favor_status", "not_given")),
|
|
candy_status=str(overrides.get("candy_status", "not_given")),
|
|
child_red_packet_status=str(overrides.get("child_red_packet_status", "not_given")),
|
|
note=overrides.get("note", None),
|
|
version=int(str(overrides.get("version", 1))),
|
|
)
|
|
db.session.add(household)
|
|
db.session.commit()
|
|
return household
|
|
|
|
|
|
def login(client, *, username: str, password: str):
|
|
return client.post(
|
|
"/auth/login",
|
|
data={"username": username, "password": password},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
|
|
def test_admin_accounts_page_renders_for_admin(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin", display_name="管理员")
|
|
create_account(username="editor-user", password="EditorPass123!", role="editor", display_name="录入员")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.get("/admin/accounts")
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "账号管理" in page
|
|
assert "editor-user" in page
|
|
assert "新增账号" in page
|
|
|
|
|
|
def test_admin_account_form_includes_quick_editor_role_option(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin", display_name="管理员")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.get("/admin/accounts/new")
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert '<option value="quick_editor"' in page
|
|
|
|
|
|
def test_admin_accounts_page_rejects_editor(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="editor", password="EditorPass123!", role="editor")
|
|
|
|
login(client, username="editor", password="EditorPass123!")
|
|
response = client.get("/admin/accounts", follow_redirects=False)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/")
|
|
|
|
|
|
def test_admin_rejects_weak_password_on_account_create(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.post(
|
|
"/admin/accounts/new",
|
|
data={
|
|
"username": "weak-user",
|
|
"display_name": "弱密码账号",
|
|
"role": "entry_only",
|
|
"password": "weakpass",
|
|
"confirm_password": "weakpass",
|
|
},
|
|
follow_redirects=True,
|
|
)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 400
|
|
assert "初始密码需同时包含大写字母、小写字母和数字" in page
|
|
|
|
with app.app_context():
|
|
created = db.session.execute(db.select(Account).where(Account.username == "weak-user")).scalar_one_or_none()
|
|
assert created is None
|
|
|
|
|
|
def test_admin_can_create_account_and_write_audit_log(client, app) -> None:
|
|
with app.app_context():
|
|
admin_account = create_account(username="admin", password="AdminPass123!", role="admin")
|
|
admin_account_id = admin_account.id
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.post(
|
|
"/admin/accounts/new",
|
|
data={
|
|
"username": "entry-user",
|
|
"display_name": "长辈录入",
|
|
"role": "entry_only",
|
|
"password": "EntryPass123!",
|
|
"confirm_password": "EntryPass123!",
|
|
},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/admin/accounts")
|
|
|
|
with app.app_context():
|
|
created = db.session.execute(db.select(Account).where(Account.username == "entry-user")).scalar_one_or_none()
|
|
assert created is not None
|
|
assert created.display_name == "长辈录入"
|
|
assert created.role == "entry_only"
|
|
assert created.created_by == admin_account_id
|
|
|
|
logs = db.session.execute(db.select(AuditLog).where(AuditLog.action_type == "create_account")).scalars().all()
|
|
assert len(logs) == 1
|
|
assert logs[0].target_id == created.id
|
|
assert logs[0].actor_user_id == admin_account_id
|
|
assert logs[0].after_data_json is not None
|
|
assert logs[0].after_data_json["username"] == "entry-user"
|
|
|
|
|
|
def test_admin_can_edit_account_and_toggle_status(client, app) -> None:
|
|
with app.app_context():
|
|
admin_account = create_account(username="admin", password="AdminPass123!", role="admin")
|
|
target_account = create_account(username="target", password="TargetPass123!", role="entry_only")
|
|
target_account_id = target_account.id
|
|
admin_account_id = admin_account.id
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
edit_response = client.post(
|
|
f"/admin/accounts/{target_account_id}/edit",
|
|
data={"display_name": "新的显示名", "role": "editor"},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
assert edit_response.status_code == 302
|
|
assert edit_response.headers["Location"].endswith("/admin/accounts")
|
|
|
|
toggle_response = client.post(f"/admin/accounts/{target_account_id}/toggle-status", follow_redirects=False)
|
|
assert toggle_response.status_code == 302
|
|
assert toggle_response.headers["Location"].endswith("/admin/accounts")
|
|
|
|
with app.app_context():
|
|
target = db.session.get(Account, target_account_id)
|
|
assert target is not None
|
|
assert target.display_name == "新的显示名"
|
|
assert target.role == "editor"
|
|
assert target.status == "disabled"
|
|
assert target.updated_by == admin_account_id
|
|
|
|
action_types = db.session.execute(
|
|
db.select(AuditLog.action_type).where(AuditLog.target_id == target_account_id).order_by(AuditLog.id.asc()),
|
|
).scalars().all()
|
|
assert "update_account" in action_types
|
|
assert "toggle_account_status" in action_types
|
|
|
|
|
|
def test_admin_rejects_weak_password_on_reset(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
target_account = create_account(username="target", password="OldPass123!", role="editor")
|
|
target_account_id = target_account.id
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.post(
|
|
f"/admin/accounts/{target_account_id}/reset-password",
|
|
data={"password": "weakpass", "confirm_password": "weakpass"},
|
|
follow_redirects=True,
|
|
)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 400
|
|
assert "新密码需同时包含大写字母、小写字母和数字" in page
|
|
|
|
with app.app_context():
|
|
target = db.session.get(Account, target_account_id)
|
|
assert target is not None
|
|
assert target.check_password("OldPass123!") is True
|
|
|
|
|
|
def test_admin_can_reset_password(client, app) -> None:
|
|
with app.app_context():
|
|
create_account(username="admin", password="AdminPass123!", role="admin")
|
|
target_account = create_account(username="target", password="OldPass123!", role="editor")
|
|
target_account_id = target_account.id
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.post(
|
|
f"/admin/accounts/{target_account_id}/reset-password",
|
|
data={"password": "NewPass123!", "confirm_password": "NewPass123!"},
|
|
follow_redirects=False,
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert response.headers["Location"].endswith("/admin/accounts")
|
|
|
|
with app.app_context():
|
|
target = db.session.get(Account, target_account_id)
|
|
assert target is not None
|
|
assert target.check_password("NewPass123!") is True
|
|
|
|
logs = db.session.execute(db.select(AuditLog).where(AuditLog.action_type == "reset_password")).scalars().all()
|
|
assert len(logs) == 1
|
|
assert logs[0].target_id == target_account_id
|
|
|
|
|
|
def test_admin_cannot_disable_current_account(client, app) -> None:
|
|
with app.app_context():
|
|
admin_account = create_account(username="admin", password="AdminPass123!", role="admin")
|
|
admin_account_id = admin_account.id
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.post(f"/admin/accounts/{admin_account_id}/toggle-status", follow_redirects=True)
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "不能停用当前正在使用的管理员账号" in page
|
|
|
|
with app.app_context():
|
|
admin_account = db.session.get(Account, admin_account_id)
|
|
assert admin_account is not None
|
|
assert admin_account.status == "active"
|
|
|
|
|
|
def test_audit_logs_page_supports_filters_and_detail(client, app) -> None:
|
|
with app.app_context():
|
|
admin_account = create_account(username="admin", password="AdminPass123!", role="admin", display_name="管理员")
|
|
household = create_household()
|
|
write_log = AuditLog(
|
|
actor_user_id=admin_account.id,
|
|
actor_username=admin_account.username,
|
|
actor_display_name=admin_account.display_name,
|
|
action_type="update_household",
|
|
target_type="household",
|
|
target_id=household.id,
|
|
target_display_name=household.head_name,
|
|
before_data_json={"head_name": "旧姓名"},
|
|
after_data_json={"head_name": household.head_name},
|
|
)
|
|
db.session.add(write_log)
|
|
db.session.commit()
|
|
log_id = write_log.id
|
|
actor_id = admin_account.id
|
|
|
|
login(client, username="admin", password="AdminPass123!")
|
|
response = client.get(f"/admin/audit-logs?actor_user_id={actor_id}&action_type=update_household")
|
|
page = response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
assert "审计日志" in page
|
|
assert "update_household" in page
|
|
assert "管理员" in page
|
|
|
|
detail_response = client.get(f"/admin/audit-logs/{log_id}")
|
|
detail_page = detail_response.get_data(as_text=True)
|
|
assert detail_response.status_code == 200
|
|
assert "审计详情" in detail_page
|
|
assert "Before 快照" in detail_page
|
|
assert "After 快照" in detail_page
|
|
|