feat: add system quality improvement plan and implementation

This commit is contained in:
张翔
2026-03-12 18:20:50 +08:00
parent c8646974d8
commit fe2e4110dd
238 changed files with 21864 additions and 2026 deletions
+218
View File
@@ -0,0 +1,218 @@
"""
审计日志测试用例
"""
import pytest
import time
from api.audit_api import SysLogAPI
@pytest.mark.audit
@pytest.mark.regression
class TestLoginLog:
"""登录日志测试类"""
@pytest.mark.asyncio
async def test_create_login_log(self, authenticated_client):
"""测试创建登录日志"""
api = SysLogAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"username": f"testuser_{timestamp}",
"ip": "127.0.0.1",
"loginLocation": "本地",
"browser": "Chrome",
"os": "Mac OS",
"status": "0",
"msg": "登录成功"
}
response = await api.create_login_log(data)
assert response.status_code == 201
result = response.json()
assert result["username"] == data["username"]
@pytest.mark.asyncio
async def test_get_all_login_logs(self, authenticated_client):
"""测试获取所有登录日志"""
api = SysLogAPI(authenticated_client)
response = await api.get_login_logs()
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_get_login_log_by_id(self, authenticated_client):
"""测试根据ID获取登录日志"""
api = SysLogAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"username": f"testuser_{timestamp}",
"ip": "127.0.0.1",
"status": "0",
"msg": "登录成功"
}
create_response = await api.create_login_log(data)
log_id = create_response.json()["id"]
response = await api.get_login_log_by_id(log_id)
assert response.status_code == 200
assert response.json()["id"] == log_id
@pytest.mark.audit
@pytest.mark.regression
class TestExceptionLog:
"""异常日志测试类"""
@pytest.mark.asyncio
async def test_create_exception_log(self, authenticated_client):
"""测试创建异常日志"""
api = SysLogAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"title": f"测试异常_{timestamp}",
"exceptionName": "NullPointerException",
"exceptionMsg": "Null pointer at line 100",
"methodName": "cn.novalon.manage.sys.service.UserService.getUser",
"ip": "127.0.0.1",
"exceptionStack": "java.lang.NullPointerException\\n at..."
}
response = await api.create_exception_log(data)
assert response.status_code == 201
result = response.json()
assert result["title"] == data["title"]
@pytest.mark.asyncio
async def test_get_all_exception_logs(self, authenticated_client):
"""测试获取所有异常日志"""
api = SysLogAPI(authenticated_client)
response = await api.get_exception_logs()
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_get_exception_log_by_id(self, authenticated_client):
"""测试根据ID获取异常日志"""
api = SysLogAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"title": f"测试异常_{timestamp}",
"exceptionName": "NullPointerException",
"exceptionMsg": "Null pointer"
}
create_response = await api.create_exception_log(data)
log_id = create_response.json()["id"]
response = await api.get_exception_log_by_id(log_id)
assert response.status_code == 200
assert response.json()["id"] == log_id
@pytest.mark.asyncio
async def test_get_login_logs_by_page_success(self, authenticated_client):
"""测试分页获取登录日志成功"""
api = SysLogAPI(authenticated_client)
for i in range(5):
timestamp = int(time.time() * 1000) + i
data = {
"username": f"testuser_{i}",
"ip": f"127.0.0.{i}",
"status": "0",
"msg": "登录成功"
}
await api.create_login_log(data)
response = await api.get_login_logs_by_page(page=0, size=10)
assert response.status_code == 200
data = response.json()
assert "content" in data
assert "totalElements" in data
assert "totalPages" in data
assert "currentPage" in data
assert "pageSize" in data
assert len(data["content"]) <= 10
@pytest.mark.asyncio
async def test_get_login_logs_by_page_with_sort(self, authenticated_client):
"""测试分页获取登录日志并排序成功"""
api = SysLogAPI(authenticated_client)
for i in range(3):
timestamp = int(time.time() * 1000) + i
data = {
"username": f"sortuser_{i}",
"ip": "127.0.0.1",
"status": "0",
"msg": "登录成功"
}
await api.create_login_log(data)
response = await api.get_login_logs_by_page(page=0, size=10, sort="username", order="asc")
assert response.status_code == 200
data = response.json()
usernames = [log["username"] for log in data["content"]]
assert usernames == sorted(usernames)
@pytest.mark.asyncio
async def test_get_login_logs_by_page_with_search(self, authenticated_client):
"""测试分页获取登录日志并搜索成功"""
api = SysLogAPI(authenticated_client)
timestamp1 = int(time.time() * 1000)
data1 = {
"username": "search_test_user",
"ip": "127.0.0.1",
"status": "0",
"msg": "登录成功"
}
await api.create_login_log(data1)
timestamp2 = int(time.time() * 1000) + 1
data2 = {
"username": "other_user",
"ip": "127.0.0.2",
"status": "0",
"msg": "登录成功"
}
await api.create_login_log(data2)
response = await api.get_login_logs_by_page(page=0, size=10, keyword="search")
assert response.status_code == 200
data = response.json()
assert len(data["content"]) >= 1
assert all("search" in log["username"] or "search" in log.get("ip", "")
for log in data["content"])
@pytest.mark.asyncio
async def test_get_login_log_count_success(self, authenticated_client):
"""测试获取登录日志总数成功"""
api = SysLogAPI(authenticated_client)
initial_count_response = await api.get_login_log_count()
initial_count = initial_count_response.json()
timestamp = int(time.time() * 1000)
data = {
"username": f"count_test_user",
"ip": "127.0.0.1",
"status": "0",
"msg": "登录成功"
}
await api.create_login_log(data)
final_count_response = await api.get_login_log_count()
final_count = final_count_response.json()
assert final_count == initial_count + 1
+26 -31
View File
@@ -20,10 +20,10 @@ class TestAuth:
assert response.status_code == 200
data = response.json()
assert "accessToken" in data
assert "refreshToken" in data
assert isinstance(data["accessToken"], str)
assert isinstance(data["refreshToken"], str)
assert "token" in data
assert isinstance(data["token"], str)
assert "userId" in data
assert "username" in data
@pytest.mark.asyncio
async def test_login_invalid_credentials(self, http_client):
@@ -44,40 +44,35 @@ class TestAuth:
assert response.status_code == 400
@pytest.mark.asyncio
async def test_refresh_token_success(self, http_client, auth_token):
"""测试刷新token成功"""
auth_api = AuthAPI(http_client)
async def test_register_success(self, http_client):
"""测试注册成功"""
import time
timestamp = int(time.time() * 1000)
response = await http_client.post("/api/auth/register", json={
"username": f"testuser_{timestamp}",
"password": "password123",
"email": f"test_{timestamp}@example.com"
})
login_response = await auth_api.login(settings.TEST_USERNAME, settings.TEST_PASSWORD)
refresh_token = login_response.json().get("refreshToken")
response = await auth_api.refresh_token(refresh_token)
assert response.status_code == 200
assert response.status_code == 201
data = response.json()
assert "accessToken" in data
assert "refreshToken" in data
assert "id" in data
assert data["username"] == f"testuser_{timestamp}"
@pytest.mark.asyncio
async def test_refresh_token_invalid(self, http_client):
"""测试无效刷新token"""
auth_api = AuthAPI(http_client)
response = await auth_api.refresh_token("invalid_refresh_token")
async def test_register_duplicate_username(self, http_client):
"""测试注册重复用户名"""
response = await http_client.post("/api/auth/register", json={
"username": "admin",
"password": "password123",
"email": "admin@example.com"
})
assert response.status_code == 401
assert response.status_code == 500
@pytest.mark.asyncio
async def test_logout_success(self, http_client, auth_token):
async def test_logout_success(self, http_client):
"""测试登出成功"""
auth_api = AuthAPI(http_client)
response = await auth_api.logout(auth_token)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_logout_without_token(self, http_client):
"""测试无token登出"""
auth_api = AuthAPI(http_client)
response = await http_client.post("/api/auth/logout")
assert response.status_code == 400
assert response.status_code == 200
+105
View File
@@ -0,0 +1,105 @@
"""
系统配置测试用例
"""
import pytest
import time
from api.config_api import SysConfigAPI
@pytest.mark.config
@pytest.mark.regression
class TestSysConfig:
"""系统参数配置测试类"""
@pytest.mark.asyncio
async def test_create_config_success(self, authenticated_client):
"""测试创建系统配置成功"""
api = SysConfigAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"configName": f"测试配置_{timestamp}",
"configKey": f"test.config.key.{timestamp}",
"configValue": "test_value",
"configType": "N"
}
response = await api.create(data)
assert response.status_code == 201
result = response.json()
assert result["configName"] == data["configName"]
assert result["configKey"] == data["configKey"]
@pytest.mark.asyncio
async def test_get_all_configs(self, authenticated_client):
"""测试获取所有配置"""
api = SysConfigAPI(authenticated_client)
response = await api.get_all()
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_get_config_by_key(self, authenticated_client):
"""测试根据key获取配置"""
api = SysConfigAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"configName": f"测试配置_{timestamp}",
"configKey": f"test.config.key.{timestamp}",
"configValue": "test_value",
"configType": "N"
}
create_response = await api.create(data)
config_key = data["configKey"]
response = await api.get_by_key(config_key)
assert response.status_code == 200
result = response.json()
assert result["configKey"] == config_key
@pytest.mark.asyncio
async def test_update_config(self, authenticated_client):
"""测试更新配置"""
api = SysConfigAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"configName": f"测试配置_{timestamp}",
"configKey": f"test.config.key.{timestamp}",
"configValue": "old_value",
"configType": "N"
}
create_response = await api.create(data)
config_id = create_response.json()["id"]
update_data = {
"configName": f"更新后_{timestamp}",
"configKey": f"test.config.key.{timestamp}",
"configValue": "new_value",
"configType": "N"
}
response = await api.update(config_id, update_data)
assert response.status_code == 200
assert response.json()["configValue"] == "new_value"
@pytest.mark.asyncio
async def test_delete_config(self, authenticated_client):
"""测试删除配置"""
api = SysConfigAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"configName": f"测试配置_{timestamp}",
"configKey": f"test.config.key.{timestamp}",
"configValue": "test_value",
"configType": "N"
}
create_response = await api.create(data)
config_id = create_response.json()["id"]
response = await api.delete(config_id)
assert response.status_code == 204
+164
View File
@@ -0,0 +1,164 @@
"""
字典管理测试用例
"""
import pytest
import time
from api.dict_api import DictTypeAPI, DictDataAPI
@pytest.mark.dict
@pytest.mark.regression
class TestDictType:
"""字典类型测试类"""
@pytest.mark.asyncio
async def test_create_dict_type_success(self, authenticated_client):
"""测试创建字典类型成功"""
api = DictTypeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"dictName": f"测试字典_{timestamp}",
"dictType": f"test_{timestamp}",
"status": "0"
}
response = await api.create(data)
assert response.status_code == 201
result = response.json()
assert result["dictName"] == data["dictName"]
assert result["dictType"] == data["dictType"]
@pytest.mark.asyncio
async def test_get_all_dict_types(self, authenticated_client):
"""测试获取所有字典类型"""
api = DictTypeAPI(authenticated_client)
response = await api.get_all()
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_get_dict_type_by_id(self, authenticated_client):
"""测试根据ID获取字典类型"""
api = DictTypeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
create_data = {
"dictName": f"测试字典_{timestamp}",
"dictType": f"test_{timestamp}",
"status": "0"
}
create_response = await api.create(create_data)
dict_id = create_response.json()["id"]
response = await api.get_by_id(dict_id)
assert response.status_code == 200
assert response.json()["id"] == dict_id
@pytest.mark.asyncio
async def test_update_dict_type(self, authenticated_client):
"""测试更新字典类型"""
api = DictTypeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
create_data = {
"dictName": f"测试字典_{timestamp}",
"dictType": f"test_{timestamp}",
"status": "0"
}
create_response = await api.create(create_data)
dict_id = create_response.json()["id"]
update_data = {
"dictName": f"更新后_{timestamp}",
"dictType": f"test_{timestamp}",
"status": "0"
}
response = await api.update(dict_id, update_data)
assert response.status_code == 200
assert response.json()["dictName"] == f"更新后_{timestamp}"
@pytest.mark.asyncio
async def test_delete_dict_type(self, authenticated_client):
"""测试删除字典类型"""
api = DictTypeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
create_data = {
"dictName": f"测试字典_{timestamp}",
"dictType": f"test_{timestamp}",
"status": "0"
}
create_response = await api.create(create_data)
dict_id = create_response.json()["id"]
response = await api.delete(dict_id)
assert response.status_code == 204
@pytest.mark.dict
@pytest.mark.regression
class TestDictData:
"""字典数据测试类"""
@pytest.mark.asyncio
async def test_create_dict_data_success(self, authenticated_client):
"""测试创建字典数据成功"""
dict_type_api = DictTypeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
dict_type_data = {
"dictName": f"测试字典类型_{timestamp}",
"dictType": f"test_type_{timestamp}",
"status": "0"
}
dict_type_response = await dict_type_api.create(dict_type_data)
dict_type_id = dict_type_response.json()["id"]
dict_data_api = DictDataAPI(authenticated_client)
data = {
"dictSort": 1,
"dictLabel": f"测试标签_{timestamp}",
"dictValue": f"test_value_{timestamp}",
"dictType": f"test_type_{timestamp}",
"status": "0"
}
response = await dict_data_api.create(data)
assert response.status_code == 201
result = response.json()
assert result["dictLabel"] == data["dictLabel"]
assert result["dictValue"] == data["dictValue"]
@pytest.mark.asyncio
async def test_get_dict_data_by_type(self, authenticated_client):
"""测试根据类型获取字典数据"""
dict_type_api = DictTypeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
dict_type = f"test_type_{timestamp}"
dict_type_data = {
"dictName": f"测试字典类型_{timestamp}",
"dictType": dict_type,
"status": "0"
}
await dict_type_api.create(dict_type_data)
dict_data_api = DictDataAPI(authenticated_client)
data = {
"dictSort": 1,
"dictLabel": f"测试标签_{timestamp}",
"dictValue": f"test_value_{timestamp}",
"dictType": dict_type,
"status": "0"
}
await dict_data_api.create(data)
response = await dict_data_api.get_by_type(dict_type)
assert response.status_code == 200
result = response.json()
assert len(result) > 0
assert result[0]["dictType"] == dict_type
+114
View File
@@ -0,0 +1,114 @@
"""
文件管理测试用例
"""
import pytest
import os
import time
from api.file_api import SysFileAPI
@pytest.mark.file
@pytest.mark.regression
class TestSysFile:
"""文件管理测试类"""
@pytest.mark.asyncio
async def test_upload_file(self, authenticated_client):
"""测试文件上传"""
api = SysFileAPI(authenticated_client)
test_file_path = "/tmp/test_file.txt"
with open(test_file_path, "w") as f:
f.write("This is a test file content")
response = await api.upload(test_file_path, "test_user")
os.remove(test_file_path)
assert response.status_code == 201
result = response.json()
assert "id" in result
@pytest.mark.asyncio
async def test_get_all_files(self, authenticated_client):
"""测试获取所有文件"""
api = SysFileAPI(authenticated_client)
response = await api.get_all()
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_get_file_by_id(self, authenticated_client):
"""测试根据ID获取文件"""
api = SysFileAPI(authenticated_client)
test_file_path = "/tmp/test_file.txt"
with open(test_file_path, "w") as f:
f.write("Test content")
upload_response = await api.upload(test_file_path, "test_user")
file_id = upload_response.json()["id"]
os.remove(test_file_path)
response = await api.get_by_id(file_id)
assert response.status_code == 200
assert response.json()["id"] == file_id
@pytest.mark.asyncio
async def test_download_file(self, authenticated_client):
"""测试文件下载"""
api = SysFileAPI(authenticated_client)
test_file_path = "/tmp/test_file.txt"
with open(test_file_path, "w") as f:
f.write("Download test content")
upload_response = await api.upload(test_file_path, "test_user")
file_name = upload_response.json()["filePath"].split("/")[-1]
os.remove(test_file_path)
response = await api.download(file_name)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_preview_file(self, authenticated_client):
"""测试文件预览"""
api = SysFileAPI(authenticated_client)
test_file_path = "/tmp/test_file.txt"
with open(test_file_path, "w") as f:
f.write("Preview test content")
upload_response = await api.upload(test_file_path, "test_user")
file_name = upload_response.json()["filePath"].split("/")[-1]
os.remove(test_file_path)
response = await api.preview(file_name)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_delete_file(self, authenticated_client):
"""测试删除文件"""
api = SysFileAPI(authenticated_client)
test_file_path = "/tmp/test_file.txt"
with open(test_file_path, "w") as f:
f.write("Delete test content")
upload_response = await api.upload(test_file_path, "test_user")
file_id = upload_response.json()["id"]
os.remove(test_file_path)
response = await api.delete(file_id)
assert response.status_code == 204
+184
View File
@@ -0,0 +1,184 @@
"""
通知公告测试用例
"""
import pytest
import time
from api.notice_api import SysNoticeAPI, SysMessageAPI
@pytest.mark.notice
@pytest.mark.regression
class TestSysNotice:
"""系统公告测试类"""
@pytest.mark.asyncio
async def test_create_notice_success(self, authenticated_client):
"""测试创建公告成功"""
api = SysNoticeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"noticeTitle": f"测试公告_{timestamp}",
"noticeType": "1",
"noticeContent": "这是测试公告内容",
"status": "0"
}
response = await api.create(data)
assert response.status_code == 201
result = response.json()
assert result["noticeTitle"] == data["noticeTitle"]
@pytest.mark.asyncio
async def test_get_all_notices(self, authenticated_client):
"""测试获取所有公告"""
api = SysNoticeAPI(authenticated_client)
response = await api.get_all()
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_get_notice_by_id(self, authenticated_client):
"""测试根据ID获取公告"""
api = SysNoticeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"noticeTitle": f"测试公告_{timestamp}",
"noticeType": "1",
"noticeContent": "测试内容",
"status": "0"
}
create_response = await api.create(data)
notice_id = create_response.json()["id"]
response = await api.get_by_id(notice_id)
assert response.status_code == 200
assert response.json()["id"] == notice_id
@pytest.mark.asyncio
async def test_get_notice_by_status(self, authenticated_client):
"""测试根据状态获取公告"""
api = SysNoticeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"noticeTitle": f"测试公告_{timestamp}",
"noticeType": "1",
"noticeContent": "测试内容",
"status": "0"
}
await api.create(data)
response = await api.get_by_status("0")
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_update_notice(self, authenticated_client):
"""测试更新公告"""
api = SysNoticeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"noticeTitle": f"测试公告_{timestamp}",
"noticeType": "1",
"noticeContent": "原始内容",
"status": "0"
}
create_response = await api.create(data)
notice_id = create_response.json()["id"]
update_data = {
"noticeTitle": f"更新后_{timestamp}",
"noticeType": "1",
"noticeContent": "更新后内容",
"status": "0"
}
response = await api.update(notice_id, update_data)
assert response.status_code == 200
assert response.json()["noticeTitle"] == f"更新后_{timestamp}"
@pytest.mark.asyncio
async def test_delete_notice(self, authenticated_client):
"""测试删除公告"""
api = SysNoticeAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"noticeTitle": f"测试公告_{timestamp}",
"noticeType": "1",
"noticeContent": "测试内容",
"status": "0"
}
create_response = await api.create(data)
notice_id = create_response.json()["id"]
response = await api.delete(notice_id)
assert response.status_code == 204
@pytest.mark.notice
@pytest.mark.regression
class TestSysMessage:
"""用户消息测试类"""
@pytest.mark.asyncio
async def test_create_message(self, authenticated_client):
"""测试创建消息"""
api = SysMessageAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"userId": 1,
"title": f"测试消息_{timestamp}",
"content": "这是测试消息内容",
"type": "1"
}
response = await api.create(data)
assert response.status_code == 201
result = response.json()
assert result["title"] == data["title"]
@pytest.mark.asyncio
async def test_get_messages_by_user(self, authenticated_client):
"""测试获取用户消息"""
api = SysMessageAPI(authenticated_client)
user_id = 1
response = await api.get_by_user(user_id)
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_get_unread_count(self, authenticated_client):
"""测试获取未读消息数量"""
api = SysMessageAPI(authenticated_client)
user_id = 1
response = await api.get_unread_count(user_id)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_mark_message_as_read(self, authenticated_client):
"""测试标记消息为已读"""
api = SysMessageAPI(authenticated_client)
timestamp = int(time.time() * 1000)
data = {
"userId": 1,
"title": f"测试消息_{timestamp}",
"content": "测试内容",
"type": "1"
}
create_response = await api.create(data)
message_id = create_response.json()["id"]
response = await api.mark_as_read(message_id)
assert response.status_code == 200
+186 -27
View File
@@ -20,9 +20,10 @@ class TestRole:
assert response.status_code == 201
data = response.json()
assert "id" in data
assert data["name"] == test_role_data["name"]
assert data["description"] == test_role_data["description"]
assert data["permissions"] == test_role_data["permissions"]
assert data["roleName"] == test_role_data["roleName"]
assert data["roleKey"] == test_role_data["roleKey"]
assert data["roleSort"] == test_role_data["roleSort"]
assert data["status"] == test_role_data["status"]
cleanup_role.append(data["id"])
@@ -53,7 +54,7 @@ class TestRole:
assert response.status_code == 200
data = response.json()
assert data["id"] == role_id
assert data["name"] == test_role_data["name"]
assert data["roleName"] == test_role_data["roleName"]
cleanup_role.append(role_id)
@@ -73,11 +74,11 @@ class TestRole:
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.get_role_by_name(test_role_data["name"])
response = await role_api.get_role_by_name(test_role_data["roleName"])
assert response.status_code == 200
data = response.json()
assert data["name"] == test_role_data["name"]
assert data["roleName"] == test_role_data["roleName"]
cleanup_role.append(role_id)
@@ -99,18 +100,20 @@ class TestRole:
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
update_data = {"description": "Updated description"}
import time
timestamp = int(time.time() * 1000)
update_data = {"roleName": f"UPDATED_ROLE_{timestamp}"}
response = await role_api.update_role(role_id, update_data)
assert response.status_code == 200
data = response.json()
assert data["description"] == "Updated description"
assert data["roleName"] == f"UPDATED_ROLE_{timestamp}"
cleanup_role.append(role_id)
@pytest.mark.asyncio
async def test_delete_role_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试删除角色成功"""
"""测试删除角色成功(逻辑删除)"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
@@ -118,27 +121,13 @@ class TestRole:
response = await role_api.delete_role(role_id)
assert response.status_code == 204
@pytest.mark.asyncio
async def test_logical_delete_role_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试逻辑删除角色成功"""
role_api = RoleAPI(authenticated_client)
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.logical_delete_role(role_id)
assert response.status_code == 200
data = response.json()
assert data["deletedAt"] is not None
get_response = await role_api.get_role_by_id(role_id)
assert get_response.status_code == 404
get_deleted_response = await role_api.get_all_roles(include_deleted=True)
deleted_roles = get_deleted_response.json()
assert any(r["id"] == role_id for r in deleted_roles)
cleanup_role.append(role_id)
@pytest.mark.asyncio
@@ -149,7 +138,7 @@ class TestRole:
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
await role_api.logical_delete_role(role_id)
await role_api.delete_role(role_id)
response = await role_api.restore_role(role_id)
@@ -168,7 +157,7 @@ class TestRole:
create_response = await role_api.create_role(test_role_data)
role_id = create_response.json()["id"]
response = await role_api.check_name_exists(test_role_data["name"])
response = await role_api.check_name_exists(test_role_data["roleName"])
assert response.status_code == 200
assert response.json() is True
@@ -183,3 +172,173 @@ class TestRole:
assert response.status_code == 200
assert response.json() is False
@pytest.mark.asyncio
async def test_get_roles_by_page_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试分页获取角色成功"""
role_api = RoleAPI(authenticated_client)
import time
for i in range(5):
timestamp = int(time.time() * 1000)
role_data = {
"roleName": f"testrole_{timestamp}_{i}",
"roleKey": f"testrole_{timestamp}_{i}",
"roleSort": 1,
"status": 1
}
create_response = await role_api.create_role(role_data)
cleanup_role.append(create_response.json()["id"])
response = await role_api.get_roles_by_page(page=0, size=10)
assert response.status_code == 200
data = response.json()
assert "content" in data
assert "totalElements" in data
assert "totalPages" in data
assert "currentPage" in data
assert "pageSize" in data
assert len(data["content"]) <= 10
@pytest.mark.asyncio
async def test_get_roles_by_page_with_sort(self, authenticated_client, test_role_data, cleanup_role):
"""测试分页获取角色并排序成功"""
role_api = RoleAPI(authenticated_client)
import time
timestamp1 = int(time.time() * 1000)
role1_data = {
"roleName": f"role_a_{timestamp1}",
"roleKey": f"role_a_{timestamp1}",
"roleSort": 1,
"status": 1
}
create_response1 = await role_api.create_role(role1_data)
cleanup_role.append(create_response1.json()["id"])
timestamp2 = int(time.time() * 1000)
role2_data = {
"roleName": f"role_b_{timestamp2}",
"roleKey": f"role_b_{timestamp2}",
"roleSort": 2,
"status": 1
}
create_response2 = await role_api.create_role(role2_data)
cleanup_role.append(create_response2.json()["id"])
response = await role_api.get_roles_by_page(page=0, size=10, sort="roleName", order="asc")
assert response.status_code == 200
data = response.json()
role_names = [role["roleName"] for role in data["content"]]
assert role_names == sorted(role_names)
@pytest.mark.asyncio
async def test_get_roles_by_page_with_search(self, authenticated_client, test_role_data, cleanup_role):
"""测试分页获取角色并搜索成功"""
role_api = RoleAPI(authenticated_client)
import time
timestamp1 = int(time.time() * 1000)
role1_data = {
"roleName": f"search_test_role_{timestamp1}",
"roleKey": f"search_test_role_{timestamp1}",
"roleSort": 1,
"status": 1
}
create_response1 = await role_api.create_role(role1_data)
cleanup_role.append(create_response1.json()["id"])
timestamp2 = int(time.time() * 1000)
role2_data = {
"roleName": f"other_role_{timestamp2}",
"roleKey": f"other_role_{timestamp2}",
"roleSort": 1,
"status": 1
}
create_response2 = await role_api.create_role(role2_data)
cleanup_role.append(create_response2.json()["id"])
response = await role_api.get_roles_by_page(page=0, size=10, keyword="search")
assert response.status_code == 200
data = response.json()
assert len(data["content"]) >= 1
assert all("search" in role["roleName"] or "search" in role["roleKey"]
for role in data["content"])
@pytest.mark.asyncio
async def test_get_role_count_success(self, authenticated_client, test_role_data, cleanup_role):
"""测试获取角色总数成功"""
role_api = RoleAPI(authenticated_client)
initial_count_response = await role_api.get_role_count()
initial_count = initial_count_response.json()
create_response = await role_api.create_role(test_role_data)
cleanup_role.append(create_response.json()["id"])
final_count_response = await role_api.get_role_count()
final_count = final_count_response.json()
assert final_count == initial_count + 1
@pytest.mark.asyncio
async def test_get_roles_by_page_with_different_page_sizes(self, authenticated_client, test_role_data, cleanup_role):
"""测试不同页面大小的分页获取角色成功"""
role_api = RoleAPI(authenticated_client)
import time
for i in range(15):
timestamp = int(time.time() * 1000)
role_data = {
"roleName": f"pagesize_test_{timestamp}_{i}",
"roleKey": f"pagesize_test_{timestamp}_{i}",
"roleSort": 1,
"status": 1
}
create_response = await role_api.create_role(role_data)
cleanup_role.append(create_response.json()["id"])
response_size_10 = await role_api.get_roles_by_page(page=0, size=10)
assert response_size_10.status_code == 200
data_size_10 = response_size_10.json()
assert len(data_size_10["content"]) == 10
response_size_5 = await role_api.get_roles_by_page(page=0, size=5)
assert response_size_5.status_code == 200
data_size_5 = response_size_5.json()
assert len(data_size_5["content"]) == 5
@pytest.mark.asyncio
async def test_get_roles_by_page_with_page_navigation(self, authenticated_client, test_role_data, cleanup_role):
"""测试分页导航成功"""
role_api = RoleAPI(authenticated_client)
import time
for i in range(25):
timestamp = int(time.time() * 1000)
role_data = {
"roleName": f"pagination_test_{timestamp}_{i}",
"roleKey": f"pagination_test_{timestamp}_{i}",
"roleSort": 1,
"status": 1
}
create_response = await role_api.create_role(role_data)
cleanup_role.append(create_response.json()["id"])
page1_response = await role_api.get_roles_by_page(page=0, size=10)
page1_data = page1_response.json()
assert page1_data["currentPage"] == 0
assert len(page1_data["content"]) == 10
page2_response = await role_api.get_roles_by_page(page=1, size=10)
page2_data = page2_response.json()
assert page2_data["currentPage"] == 1
assert len(page2_data["content"]) == 10
page3_response = await role_api.get_roles_by_page(page=2, size=10)
page3_data = page3_response.json()
assert page3_data["currentPage"] == 2
assert len(page3_data["content"]) >= 5
+149 -2
View File
@@ -114,7 +114,7 @@ class TestUser:
response = await user_api.logical_delete_user(user_id)
assert response.status_code == 200
assert response.status_code == 204
get_response = await user_api.get_user_by_id(user_id)
assert get_response.status_code == 404
@@ -137,7 +137,7 @@ class TestUser:
response = await user_api.restore_user(user_id)
assert response.status_code == 200
assert response.status_code == 204
get_response = await user_api.get_user_by_id(user_id)
assert get_response.status_code == 200
@@ -191,3 +191,150 @@ class TestUser:
assert response.status_code == 200
assert response.json() is False
@pytest.mark.asyncio
async def test_get_users_by_page_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试分页获取用户成功"""
import time
user_api = UserAPI(authenticated_client)
timestamp = int(time.time() * 1000)
for i in range(5):
user_data = test_user_data.copy()
user_data["username"] = f"testuser_{timestamp}_{i}"
user_data["email"] = f"testuser_{timestamp}_{i}@example.com"
create_response = await user_api.create_user(user_data)
cleanup_user.append(create_response.json()["id"])
response = await user_api.get_users_by_page(page=0, size=10)
assert response.status_code == 200
data = response.json()
assert "content" in data
assert "totalElements" in data
assert "totalPages" in data
assert "currentPage" in data
assert "pageSize" in data
assert len(data["content"]) <= 10
@pytest.mark.asyncio
async def test_get_users_by_page_with_sort(self, authenticated_client, test_user_data, cleanup_user):
"""测试分页获取用户并排序成功"""
import time
user_api = UserAPI(authenticated_client)
timestamp = int(time.time() * 1000)
user1_data = test_user_data.copy()
user1_data["username"] = f"user_a_{timestamp}"
user1_data["email"] = f"user_a_{timestamp}@example.com"
create_response1 = await user_api.create_user(user1_data)
cleanup_user.append(create_response1.json()["id"])
user2_data = test_user_data.copy()
user2_data["username"] = f"user_b_{timestamp}"
user2_data["email"] = f"user_b_{timestamp}@example.com"
create_response2 = await user_api.create_user(user2_data)
cleanup_user.append(create_response2.json()["id"])
response = await user_api.get_users_by_page(page=0, size=10, sort="username", order="asc")
assert response.status_code == 200
data = response.json()
usernames = [user["username"] for user in data["content"]]
assert usernames == sorted(usernames)
@pytest.mark.asyncio
async def test_get_users_by_page_with_search(self, authenticated_client, test_user_data, cleanup_user):
"""测试分页获取用户并搜索成功"""
import time
user_api = UserAPI(authenticated_client)
timestamp = int(time.time() * 1000)
user1_data = test_user_data.copy()
user1_data["username"] = f"search_test_user_{timestamp}"
user1_data["email"] = f"search_test_{timestamp}@example.com"
create_response1 = await user_api.create_user(user1_data)
cleanup_user.append(create_response1.json()["id"])
user2_data = test_user_data.copy()
user2_data["username"] = f"other_user_{timestamp}"
user2_data["email"] = f"other_{timestamp}@example.com"
create_response2 = await user_api.create_user(user2_data)
cleanup_user.append(create_response2.json()["id"])
response = await user_api.get_users_by_page(page=0, size=10, keyword="search")
assert response.status_code == 200
data = response.json()
assert len(data["content"]) >= 1
assert all("search" in user["username"] or "search" in user["email"]
for user in data["content"])
@pytest.mark.asyncio
async def test_get_user_count_success(self, authenticated_client, test_user_data, cleanup_user):
"""测试获取用户总数成功"""
user_api = UserAPI(authenticated_client)
initial_count_response = await user_api.get_user_count()
initial_count = initial_count_response.json()
create_response = await user_api.create_user(test_user_data)
cleanup_user.append(create_response.json()["id"])
final_count_response = await user_api.get_user_count()
final_count = final_count_response.json()
assert final_count == initial_count + 1
@pytest.mark.asyncio
async def test_get_users_by_page_with_different_page_sizes(self, authenticated_client, test_user_data, cleanup_user):
"""测试不同页面大小的分页获取用户成功"""
import time
user_api = UserAPI(authenticated_client)
timestamp = int(time.time() * 1000)
for i in range(15):
user_data = test_user_data.copy()
user_data["username"] = f"pagesize_test_{timestamp}_{i}"
user_data["email"] = f"pagesize_test_{timestamp}_{i}@example.com"
create_response = await user_api.create_user(user_data)
cleanup_user.append(create_response.json()["id"])
response_size_10 = await user_api.get_users_by_page(page=0, size=10)
assert response_size_10.status_code == 200
data_size_10 = response_size_10.json()
assert len(data_size_10["content"]) == 10
response_size_5 = await user_api.get_users_by_page(page=0, size=5)
assert response_size_5.status_code == 200
data_size_5 = response_size_5.json()
assert len(data_size_5["content"]) == 5
@pytest.mark.asyncio
async def test_get_users_by_page_with_page_navigation(self, authenticated_client, test_user_data, cleanup_user):
"""测试分页导航成功"""
import time
user_api = UserAPI(authenticated_client)
timestamp = int(time.time() * 1000)
for i in range(25):
user_data = test_user_data.copy()
user_data["username"] = f"pagination_test_{timestamp}_{i}"
user_data["email"] = f"pagination_test_{timestamp}_{i}@example.com"
create_response = await user_api.create_user(user_data)
cleanup_user.append(create_response.json()["id"])
page1_response = await user_api.get_users_by_page(page=0, size=10)
page1_data = page1_response.json()
assert page1_data["currentPage"] == 0
assert len(page1_data["content"]) == 10
page2_response = await user_api.get_users_by_page(page=1, size=10)
page2_data = page2_response.json()
assert page2_data["currentPage"] == 1
assert len(page2_data["content"]) == 10
page3_response = await user_api.get_users_by_page(page=2, size=10)
page3_data = page3_response.json()
assert page3_data["currentPage"] == 2
assert len(page3_data["content"]) >= 5