"""测试认证管理器""" import pytest from unittest.mock import Mock, patch, MagicMock from apitest.client.auth_manager import AuthManager from apitest.models.exceptions import AuthException import time class TestAuthManager: """测试AuthManager类""" def test_init(self): """测试初始化""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) assert auth_manager.base_url == "http://localhost:8080" assert auth_manager.credentials == credentials assert auth_manager.logger == logger assert auth_manager._token is None assert auth_manager._refresh_token is None assert auth_manager._token_expiry is None assert auth_manager._login_endpoint == "/sys/auth/login" def test_set_login_endpoint(self): """测试设置登录端点""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager.set_login_endpoint("/api/custom/login") assert auth_manager._login_endpoint == "/api/custom/login" def test_set_credentials(self): """测试设置认证凭据""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager.set_credentials("newuser", "newpassword") assert auth_manager.credentials == {"username": "newuser", "password": "newpassword"} def test_set_token(self): """测试设置token""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager.set_token("test-token", 3600) assert auth_manager._token == "test-token" assert auth_manager._token_expiry is not None def test_get_token(self): """测试获取token""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager._token = "test-token" assert auth_manager.get_token() == "test-token" def test_get_token_none(self): """测试获取token(未设置)""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) assert auth_manager.get_token() is None def test_is_token_valid(self): """测试token有效性检查""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager.set_token("test-token", 3600) assert auth_manager.is_token_valid() is True def test_is_token_valid_expired(self): """测试token有效性检查(已过期)""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager.set_token("test-token", -1) assert auth_manager.is_token_valid() is False def test_is_token_valid_none(self): """测试token有效性检查(未设置)""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) assert auth_manager.is_token_valid() is False @patch('requests.post') def test_login_success(self, mock_post): """测试成功登录""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "data": { "token": "test-token", "refreshToken": "refresh-token", "expiresIn": 3600 } } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) result = auth_manager.login() assert result["data"]["token"] == "test-token" assert auth_manager._token == "test-token" assert auth_manager._refresh_token == "refresh-token" assert auth_manager._token_expiry is not None @patch('requests.post') def test_login_failure(self, mock_post): """测试登录失败""" mock_response = Mock() mock_response.status_code = 401 mock_response.json.return_value = { "data": {"error": "Invalid credentials"} } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "wrong-password"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) with pytest.raises(AuthException): auth_manager.login() @patch('requests.post') def test_login_no_token_in_response(self, mock_post): """测试登录(响应中无token)""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "data": {"message": "success"} } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) with pytest.raises(AuthException): auth_manager.login() @patch('requests.post') def test_login_http_error(self, mock_post): """测试登录(HTTP错误)""" import requests mock_post.side_effect = requests.RequestException("Network error") logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) with pytest.raises(AuthException): auth_manager.login() @patch('requests.post') def test_refresh_token_success(self, mock_post): """测试成功刷新token""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "data": { "token": "new-token", "refreshToken": "new-refresh-token", "expiresIn": 3600 } } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager._refresh_token = "old-refresh-token" result = auth_manager.refresh_token() assert result is True assert auth_manager._token == "new-token" assert auth_manager._refresh_token == "new-refresh-token" @patch('requests.post') def test_refresh_token_failure(self, mock_post): """测试刷新token失败""" mock_response = Mock() mock_response.status_code = 401 mock_response.json.return_value = { "data": {"error": "Invalid refresh token"} } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager._refresh_token = "old-refresh-token" result = auth_manager.refresh_token() assert result is False def test_refresh_token_no_refresh_token(self): """测试刷新token(无refresh token)""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) result = auth_manager.refresh_token() assert result is False @patch('requests.post') def test_refresh_token_exception(self, mock_post): """测试刷新token异常""" mock_post.side_effect = Exception("Network error") logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager._refresh_token = "old-refresh-token" result = auth_manager.refresh_token() assert result is False def test_logout(self): """测试登出""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager._token = "test-token" auth_manager._refresh_token = "refresh-token" auth_manager._token_expiry = time.time() + 3600 auth_manager.logout() assert auth_manager._token is None assert auth_manager._refresh_token is None assert auth_manager._token_expiry is None @patch('requests.post') def test_ensure_authenticated_with_valid_token(self, mock_post): """测试确保已认证(有效token)""" logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager.set_token("test-token", 3600) token = auth_manager.ensure_authenticated() assert token == "test-token" assert mock_post.call_count == 0 @patch('requests.post') def test_ensure_authenticated_with_expired_token(self, mock_post): """测试确保已认证(过期token,刷新成功)""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "data": { "token": "new-token", "refreshToken": "new-refresh-token", "expiresIn": 3600 } } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager._token = "old-token" auth_manager._refresh_token = "old-refresh-token" from datetime import datetime, timedelta auth_manager._token_expiry = datetime.now() - timedelta(seconds=100) token = auth_manager.ensure_authenticated() assert token == "new-token" assert auth_manager._token == "new-token" assert auth_manager._refresh_token == "new-refresh-token" @patch('requests.post') def test_ensure_authenticated_no_token(self, mock_post): """测试确保已认证(无token,登录成功)""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "data": { "token": "test-token", "refreshToken": "refresh-token", "expiresIn": 3600 } } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) token = auth_manager.ensure_authenticated() assert token == "test-token" @patch('requests.post') def test_ensure_authenticated_refresh_failed_login(self, mock_post): """测试确保已认证(刷新失败,重新登录)""" refresh_response = Mock() refresh_response.status_code = 401 refresh_response.json.return_value = { "data": {"error": "Invalid refresh token"} } login_response = Mock() login_response.status_code = 200 login_response.json.return_value = { "data": { "token": "test-token", "refreshToken": "refresh-token", "expiresIn": 3600 } } mock_post.side_effect = [refresh_response, login_response] logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) auth_manager._token = "old-token" auth_manager._refresh_token = "old-refresh-token" from datetime import datetime, timedelta auth_manager._token_expiry = datetime.now() - timedelta(seconds=100) token = auth_manager.ensure_authenticated() assert token == "test-token" assert auth_manager._token == "test-token" @patch('requests.post') def test_ensure_authenticated_all_failed(self, mock_post): """测试确保已认证(全部失败)""" mock_response = Mock() mock_response.status_code = 401 mock_response.json.return_value = { "data": {"error": "Authentication failed"} } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) with pytest.raises(AuthException): auth_manager.ensure_authenticated() @patch('requests.post') def test_get_auth_headers(self, mock_post): """测试获取认证请求头""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { "data": { "token": "test-token", "refreshToken": "refresh-token", "expiresIn": 3600 } } mock_post.return_value = mock_response logger = Mock() credentials = {"username": "admin", "password": "password123"} auth_manager = AuthManager("http://localhost:8080", credentials, logger) headers = auth_manager.get_auth_headers() assert headers["Authorization"] == "Bearer test-token" assert headers["Content-Type"] == "application/json"