feat: 统一JWT密钥配置并修复签名验证问题

修复前端签名生成中bodyString硬编码问题
添加start-frontend.sh脚本启动前端服务
统一manage-app和gateway的JWT密钥配置
修复Repository扫描路径问题
更新测试配置和依赖
重构表名映射为sys_user和sys_role
完善用户实体类字段映射
添加集成测试配置和测试用例
This commit is contained in:
张翔
2026-04-02 12:28:49 +08:00
parent 6392c08560
commit b0f91d74f5
63 changed files with 3287 additions and 889 deletions
@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
检查API请求和响应
"""
from playwright.sync_api import sync_playwright
import time
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
# 监听网络请求
api_requests = []
def handle_request(request):
if '/api/' in request.url:
headers = request.headers
api_requests.append({
'url': request.url,
'method': request.method,
'has_signature': 'X-Signature' in headers,
'has_timestamp': 'X-Timestamp' in headers,
'has_token': 'Authorization' in headers
})
print(f"\n请求: {request.method} {request.url}")
print(f" 签名头: {headers.get('X-Signature', 'None')[:30]}...")
print(f" 时间戳: {headers.get('X-Timestamp', 'None')}")
print(f" Token: {headers.get('Authorization', 'None')[:30]}...")
def handle_response(response):
if '/api/' in response.url:
print(f"\n响应: {response.status} {response.url}")
if response.status == 401:
print(f" ⚠️ 401错误!")
page.on('request', handle_request)
page.on('response', handle_response)
# 登录
print("登录...")
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
break
print(f"\nToken: {token[:50]}...")
# 访问dashboard
print("\n\n访问Dashboard...")
page.goto("http://localhost:3002/dashboard")
page.wait_for_load_state("networkidle")
time.sleep(2)
# 访问用户管理
print("\n\n访问用户管理...")
page.goto("http://localhost:3002/users")
page.wait_for_load_state("networkidle")
time.sleep(2)
print(f"\n最终URL: {page.url}")
browser.close()
@@ -0,0 +1,125 @@
"""
检查前端实际发送的签名头
"""
from playwright.sync_api import sync_playwright
import time
def check_frontend_signature():
"""检查前端签名头"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
signature_headers = {}
def handle_request(request):
if '/api/users/page' in request.url:
signature_headers['url'] = request.url
signature_headers['method'] = request.method
signature_headers['X-Signature'] = request.headers.get('X-Signature', 'None')
signature_headers['X-Timestamp'] = request.headers.get('X-Timestamp', 'None')
signature_headers['X-Nonce'] = request.headers.get('X-Nonce', 'None')
print(f"\n捕获到用户列表请求:")
print(f" URL: {request.url}")
print(f" Method: {request.method}")
print(f" X-Signature: {signature_headers['X-Signature'][:30] if signature_headers['X-Signature'] != 'None' else 'None'}...")
print(f" X-Timestamp: {signature_headers['X-Timestamp']}")
print(f" X-Nonce: {signature_headers['X-Nonce']}")
page.on('request', handle_request)
try:
print("=" * 60)
print("检查前端签名头")
print("=" * 60)
print("\n1. 登录...")
page.goto('http://localhost:3002/login')
page.wait_for_load_state('networkidle')
page.fill('input[type="text"], input[placeholder*="用户名"]', 'admin')
page.fill('input[type="password"]', 'admin123')
with page.expect_navigation(timeout=10000):
page.click('button:has-text("登录")')
time.sleep(2)
print("\n2. 访问用户管理页面...")
page.goto('http://localhost:3002/users')
time.sleep(5)
page.wait_for_load_state('networkidle')
if signature_headers:
print("\n" + "=" * 60)
print("前端签名头信息:")
print("=" * 60)
url = signature_headers.get('url', '')
method = signature_headers.get('method', 'GET')
signature = signature_headers.get('X-Signature', 'None')
timestamp = signature_headers.get('X-Timestamp', 'None')
nonce = signature_headers.get('X-Nonce', 'None')
print(f"URL: {url}")
print(f"Method: {method}")
print(f"X-Signature: {signature}")
print(f"X-Timestamp: {timestamp}")
print(f"X-Nonce: {nonce}")
# 手动验证签名
if timestamp != 'None' and nonce != 'None':
from urllib.parse import urlparse, parse_qs
parsed = urlparse(url)
path = parsed.path
query = parsed.query
print(f"\n路径: {path}")
print(f"查询参数: {query}")
# 生成期望的签名
import hmac
import hashlib
import base64
secret = 'NovalonManageSystemSecretKey2026'
string_to_sign = '\n'.join([
method,
path,
query or '',
'',
timestamp,
nonce
])
expected_signature = base64.b64encode(
hmac.new(
secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
).decode('utf-8')
print(f"\n期望的签名: {expected_signature}")
print(f"实际的签名: {signature}")
if signature == expected_signature:
print("\n✅ 签名匹配")
else:
print("\n❌ 签名不匹配")
print(f"\n签名字符串:\n{string_to_sign}")
else:
print("\n❌ 未捕获到用户列表请求")
except Exception as e:
print(f"\n❌ 错误: {str(e)}")
import traceback
traceback.print_exc()
finally:
browser.close()
if __name__ == "__main__":
check_frontend_signature()
+55
View File
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
详细检查请求头
"""
from playwright.sync_api import sync_playwright
import time
import json
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
# 监听网络请求
def handle_request(request):
if '/api/' in request.url and not request.url.endswith('.ts'):
headers = request.headers
print(f"\n{'='*80}")
print(f"请求: {request.method} {request.url}")
print(f"Headers:")
for key, value in headers.items():
if key.lower() in ['authorization', 'x-signature', 'x-timestamp', 'x-nonce']:
print(f" {key}: {value[:50]}...")
def handle_response(response):
if '/api/' in response.url and not response.url.endswith('.ts'):
print(f"响应: {response.status} {response.url}")
page.on('request', handle_request)
page.on('response', handle_response)
# 登录
print("登录...")
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
print(f"\n登录成功,Token: {token[:50]}...")
break
# 访问dashboard
print("\n\n访问Dashboard...")
page.goto("http://localhost:3002/dashboard")
page.wait_for_load_state("networkidle")
time.sleep(3)
browser.close()
+44
View File
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""
检查JWT密钥长度
"""
import base64
# Gateway配置的secret
gateway_secret = "U2FsdGVkX1+vZ5Y9QmKxL8nN3rP7tW2jH4fG6dA8sB1cE5yN0zX3qV7wM4"
# Manage-app默认的secret
default_secret = "default-secret-key-change-in-production"
print("Gateway secret:")
print(f" 长度: {len(gateway_secret)} bytes")
print(f" Base64解码后长度: {len(base64.b64decode(gateway_secret + '=='))} bytes")
print(f"\nManage-app默认secret:")
print(f" 长度: {len(default_secret)} bytes")
print("\nJWT算法要求:")
print(" HS256: 至少32 bytes (256 bits)")
print(" HS384: 至少48 bytes (384 bits)")
print(" HS512: 至少64 bytes (512 bits)")
print(f"\nGateway secret长度 {len(gateway_secret)} bytes:")
if len(gateway_secret) >= 64:
print(" 支持 HS512")
elif len(gateway_secret) >= 48:
print(" 支持 HS384")
elif len(gateway_secret) >= 32:
print(" 支持 HS256")
else:
print(" 不满足任何算法要求")
print(f"\nManage-app默认secret长度 {len(default_secret)} bytes:")
if len(default_secret) >= 64:
print(" 支持 HS512")
elif len(default_secret) >= 48:
print(" 支持 HS384")
elif len(default_secret) >= 32:
print(" 支持 HS256")
else:
print(" 不满足任何算法要求")
+67
View File
@@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""
检查各个页面的实际内容
"""
from playwright.sync_api import sync_playwright
import time
pages_to_check = [
('Dashboard', 'http://localhost:3002/dashboard'),
('用户管理', 'http://localhost:3002/users'),
('角色管理', 'http://localhost:3002/roles'),
('菜单管理', 'http://localhost:3002/menus'),
('字典管理', 'http://localhost:3002/dict'),
('系统配置', 'http://localhost:3002/sys/config'),
('文件管理', 'http://localhost:3002/files'),
('通知管理', 'http://localhost:3002/notice'),
('操作日志', 'http://localhost:3002/oplog'),
('登录日志', 'http://localhost:3002/loginlog'),
]
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# 登录
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
break
print(f"登录成功: {token[:50] if token else 'None'}...\n")
# 检查每个页面
for name, url in pages_to_check:
print(f"检查 {name} ({url})...")
try:
page.goto(url)
page.wait_for_load_state("networkidle")
time.sleep(2)
# 检查页面内容
table_count = page.locator('table').count()
el_table_count = page.locator('.el-table').count()
body_text = page.locator('body').text_content()[:200]
print(f" URL: {page.url}")
print(f" table标签: {table_count}, .el-table: {el_table_count}")
print(f" 内容: {body_text[:100]}...")
# 截图
page.screenshot(path=f"/tmp/{name.replace('/', '_')}.png")
except Exception as e:
print(f" ❌ 错误: {e}")
print()
browser.close()
@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""
检查X-User-Id header
"""
from playwright.sync_api import sync_playwright
import time
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
# 监听网络请求
def handle_request(request):
if '/api/users/page' in request.url:
headers = request.headers
print(f"\n请求: {request.method} {request.url}")
print(f"Headers:")
for key in ['authorization', 'x-user-id', 'x-username']:
if key in headers:
print(f" {key}: {headers[key]}")
else:
print(f" {key}: 不存在")
def handle_response(response):
if '/api/users/page' in response.url:
print(f"\n响应: {response.status} {response.url}")
page.on('request', handle_request)
page.on('response', handle_response)
# 登录
print("登录...")
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
print(f"\n登录成功,Token: {token[:50]}...")
break
# 访问用户管理
print("\n\n访问用户管理...")
page.goto("http://localhost:3002/users")
page.wait_for_load_state("networkidle")
time.sleep(2)
print(f"\n最终URL: {page.url}")
browser.close()
+59
View File
@@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""
检查用户管理页面的请求
"""
from playwright.sync_api import sync_playwright
import time
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
# 监听网络请求
def handle_request(request):
if '/api/' in request.url and not request.url.endswith('.ts'):
headers = request.headers
print(f"\n请求: {request.method} {request.url}")
if 'authorization' in headers:
print(f" Authorization: {headers['authorization'][:50]}...")
else:
print(f" ⚠️ 没有Authorization头!")
def handle_response(response):
if '/api/' in response.url and not response.url.endswith('.ts'):
print(f"响应: {response.status} {response.url}")
if response.status == 401:
print(f" ⚠️ 401错误!")
page.on('request', handle_request)
page.on('response', handle_response)
# 登录
print("登录...")
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
print(f"\n登录成功")
break
# 访问用户管理
print("\n\n访问用户管理...")
page.goto("http://localhost:3002/users")
page.wait_for_load_state("networkidle")
time.sleep(3)
print(f"\n最终URL: {page.url}")
token_after = page.evaluate("localStorage.getItem('token')")
print(f"Token: {'存在' if token_after else '不存在'}")
browser.close()
+127
View File
@@ -0,0 +1,127 @@
"""
E2E登录功能调试测试
捕获浏览器控制台日志和网络请求
"""
from playwright.sync_api import sync_playwright
import time
def debug_login():
"""调试登录功能"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
console_messages = []
network_requests = []
def handle_console(msg):
console_messages.append({
'type': msg.type,
'text': msg.text,
'location': msg.location
})
print(f"[Console {msg.type}] {msg.text}")
def handle_request(request):
if 'login' in request.url or 'auth' in request.url:
network_requests.append({
'method': request.method,
'url': request.url,
'headers': dict(request.headers)
})
print(f"[Request {request.method}] {request.url}")
def handle_response(response):
if 'login' in response.url or 'auth' in response.url:
print(f"[Response {response.status}] {response.url}")
try:
body = response.text()
print(f" Response Body: {body[:500]}")
except:
pass
page.on('console', handle_console)
page.on('request', handle_request)
page.on('response', handle_response)
try:
print("=" * 60)
print("开始调试登录流程...")
print("=" * 60)
print("\n1. 访问登录页面...")
page.goto('http://localhost:3002')
page.wait_for_load_state('networkidle')
time.sleep(2)
print("\n2. 查找登录表单元素...")
username_input = page.locator('input[type="text"], input[placeholder*="用户名"], input[placeholder*="账号"]').first
password_input = page.locator('input[type="password"]').first
login_button = page.locator('button:has-text("登录"), button:has-text("Login")').first
print(f" 用户名输入框数量: {username_input.count()}")
print(f" 密码输入框数量: {password_input.count()}")
print(f" 登录按钮数量: {login_button.count()}")
print("\n3. 填写登录表单...")
username_input.fill('admin')
password_input.fill('admin123')
print(" 已填写用户名和密码")
print("\n4. 点击登录按钮...")
login_button.click()
print("\n5. 等待响应...")
time.sleep(5)
page.wait_for_load_state('networkidle')
print("\n6. 检查结果...")
current_url = page.url
print(f" 当前URL: {current_url}")
page.screenshot(path='/tmp/login_debug_full.png', full_page=True)
print(" 截图已保存到 /tmp/login_debug_full.png")
print("\n7. 检查页面内容...")
page_content = page.content()
if '登录失败' in page_content or 'login failed' in page_content.lower():
print(" 发现登录失败提示")
error_elements = page.locator('.error, .alert-danger, [class*="error"]').all()
if error_elements:
print(f" 发现 {len(error_elements)} 个错误提示元素")
for elem in error_elements[:3]:
print(f" - {elem.text_content()}")
print("\n" + "=" * 60)
print("调试信息汇总:")
print("=" * 60)
print(f"控制台消息数量: {len(console_messages)}")
if console_messages:
print("最近的控制台消息:")
for msg in console_messages[-5:]:
print(f" [{msg['type']}] {msg['text']}")
print(f"\n网络请求数量: {len(network_requests)}")
if network_requests:
print("登录相关请求:")
for req in network_requests:
print(f" {req['method']} {req['url']}")
print("=" * 60)
return 'login' not in current_url.lower()
except Exception as e:
print(f"\n❌ 错误: {str(e)}")
import traceback
traceback.print_exc()
page.screenshot(path='/tmp/login_error_debug.png', full_page=True)
return False
finally:
browser.close()
if __name__ == "__main__":
debug_login()
+75
View File
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
调试Token丢失问题
"""
from playwright.sync_api import sync_playwright
import time
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
# 登录
print("1. 登录...")
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
print(f" Token: {token[:50]}...")
break
# 检查localStorage
print("\n2. 检查localStorage...")
all_storage = page.evaluate("JSON.stringify(localStorage)")
print(f" localStorage: {all_storage[:200]}...")
# 访问dashboard
print("\n3. 访问dashboard...")
page.goto("http://localhost:3002/dashboard")
page.wait_for_load_state("networkidle")
time.sleep(1)
token_after = page.evaluate("localStorage.getItem('token')")
print(f" URL: {page.url}")
print(f" Token: {token_after[:50] if token_after else 'None'}...")
# 访问用户管理
print("\n4. 访问用户管理...")
page.goto("http://localhost:3002/users")
page.wait_for_load_state("networkidle")
time.sleep(1)
token_after2 = page.evaluate("localStorage.getItem('token')")
print(f" URL: {page.url}")
print(f" Token: {token_after2[:50] if token_after2 else 'None'}...")
# 检查是否有错误
print("\n5. 检查控制台错误...")
console_messages = []
page.on('console', lambda msg: console_messages.append(f"{msg.type}: {msg.text}"))
# 刷新页面
print("\n6. 刷新页面...")
page.reload()
page.wait_for_load_state("networkidle")
time.sleep(1)
token_after_reload = page.evaluate("localStorage.getItem('token')")
print(f" URL: {page.url}")
print(f" Token: {token_after_reload[:50] if token_after_reload else 'None'}...")
# 打印控制台消息
print("\n控制台消息:")
for msg in console_messages[-10:]:
print(f" {msg}")
browser.close()
@@ -0,0 +1,97 @@
"""
调试用户管理页面访问问题
"""
from playwright.sync_api import sync_playwright
import time
def debug_user_management():
"""调试用户管理页面访问"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
console_messages = []
def handle_console(msg):
console_messages.append({
'type': msg.type,
'text': msg.text
})
print(f"[Console {msg.type}] {msg.text}")
def handle_request(request):
if 'api' in request.url:
print(f"[Request {request.method}] {request.url}")
auth_header = request.headers.get('authorization', 'None')
token_header = request.headers.get('token', 'None')
print(f" Authorization: {auth_header[:30] if auth_header != 'None' else 'None'}...")
print(f" Token: {token_header[:30] if token_header != 'None' else 'None'}...")
def handle_response(response):
if 'api' in response.url:
print(f"[Response {response.status}] {response.url}")
page.on('console', handle_console)
page.on('request', handle_request)
page.on('response', handle_response)
try:
print("=" * 60)
print("调试用户管理页面访问")
print("=" * 60)
print("\n1. 登录...")
page.goto('http://localhost:3002/login')
page.wait_for_load_state('networkidle')
page.fill('input[type="text"], input[placeholder*="用户名"]', 'admin')
page.fill('input[type="password"]', 'admin123')
with page.expect_navigation(timeout=10000):
page.click('button:has-text("登录")')
time.sleep(2)
page.wait_for_load_state('networkidle')
token = page.evaluate('() => localStorage.getItem("token")')
print(f"\nToken after login: {token[:50] if token else 'None'}...")
print("\n2. 访问用户管理页面...")
page.goto('http://localhost:3002/users')
time.sleep(3)
page.wait_for_load_state('networkidle')
current_url = page.url
print(f"\n当前URL: {current_url}")
token_after = page.evaluate('() => localStorage.getItem("token")')
print(f"Token after navigation: {token_after[:50] if token_after else 'None'}...")
page.screenshot(path='/tmp/debug_user_mgmt.png', full_page=True)
print("\n" + "=" * 60)
print("调试信息汇总:")
print("=" * 60)
print(f"登录后Token: {'存在' if token else '不存在'}")
print(f"跳转后Token: {'存在' if token_after else '不存在'}")
print(f"最终URL: {current_url}")
if '/login' in current_url:
print("\n❌ 被重定向回登录页")
print("可能原因:")
print("1. Token在跳转时丢失")
print("2. 路由守卫检测到Token无效")
print("3. 权限验证失败")
else:
print("\n✅ 成功访问用户管理页面")
except Exception as e:
print(f"\n❌ 错误: {str(e)}")
import traceback
traceback.print_exc()
finally:
browser.close()
if __name__ == "__main__":
debug_user_management()
+80
View File
@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
快速验证测试 - 验证系统基本功能
"""
from playwright.sync_api import sync_playwright
import time
def test_basic_flow():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
print("1. 访问登录页...")
page.goto("http://localhost:3002/login", timeout=10000)
page.wait_for_load_state("networkidle", timeout=10000)
print("✅ 登录页加载成功")
print("\n2. 执行登录...")
page.fill('input[type="text"]', 'admin')
page.fill('input[type="password"]', 'admin123')
page.click('button[type="submit"]')
time.sleep(3)
current_url = page.url
print(f"当前URL: {current_url}")
if 'dashboard' in current_url or current_url != 'http://localhost:3002/login':
print("✅ 登录成功,已跳转")
token = page.evaluate("localStorage.getItem('token')")
if token:
print(f"✅ Token已保存: {token[:50]}...")
else:
print("⚠️ Token未保存")
print("\n3. 访问用户管理页...")
page.goto("http://localhost:3002/users", timeout=10000)
page.wait_for_load_state("networkidle", timeout=10000)
current_url = page.url
print(f"当前URL: {current_url}")
if 'login' not in current_url:
print("✅ 用户管理页访问成功,未重定向到登录页")
page_content = page.content()
if '用户管理' in page_content or 'Users' in page_content:
print("✅ 用户管理页面内容正确")
else:
print("⚠️ 用户管理页面内容可能不正确")
else:
print("❌ 用户管理页访问失败,被重定向到登录页")
return True
else:
print("❌ 登录失败,仍在登录页")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
finally:
browser.close()
if __name__ == "__main__":
print("=" * 60)
print("系统快速验证测试")
print("=" * 60)
success = test_basic_flow()
print("\n" + "=" * 60)
if success:
print("✅ 系统验证通过!")
else:
print("❌ 系统验证失败!")
print("=" * 60)
+232
View File
@@ -0,0 +1,232 @@
"""
完整业务流程E2E测试
测试用户管理、角色管理等核心功能
"""
from playwright.sync_api import sync_playwright
import time
class E2ETestSuite:
def __init__(self):
self.browser = None
self.context = None
self.page = None
self.test_results = []
def setup(self):
"""初始化测试环境"""
print("\n" + "=" * 60)
print("初始化测试环境...")
print("=" * 60)
p = sync_playwright().start()
self.browser = p.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
print("✅ 浏览器初始化完成")
def teardown(self):
"""清理测试环境"""
if self.browser:
self.browser.close()
print("\n✅ 测试环境已清理")
def login(self):
"""登录功能测试"""
print("\n" + "=" * 60)
print("测试1: 登录功能")
print("=" * 60)
try:
print("1. 访问登录页面...")
self.page.goto('http://localhost:3002/login')
self.page.wait_for_load_state('networkidle')
print("2. 填写登录表单...")
self.page.fill('input[type="text"], input[placeholder*="用户名"]', 'admin')
self.page.fill('input[type="password"]', 'admin123')
print("3. 提交登录...")
with self.page.expect_navigation(timeout=10000):
self.page.click('button:has-text("登录")')
time.sleep(2)
self.page.wait_for_load_state('networkidle')
current_url = self.page.url
token = self.page.evaluate('() => localStorage.getItem("token")')
if token and '/login' not in current_url:
print("✅ 登录成功")
print(f" 当前URL: {current_url}")
self.test_results.append(("登录功能", "PASS"))
return True
else:
print("❌ 登录失败")
self.test_results.append(("登录功能", "FAIL"))
return False
except Exception as e:
print(f"❌ 登录测试错误: {str(e)}")
self.test_results.append(("登录功能", "ERROR"))
return False
def test_user_management(self):
"""用户管理功能测试"""
print("\n" + "=" * 60)
print("测试2: 用户管理功能")
print("=" * 60)
try:
print("1. 导航到用户管理页面...")
self.page.goto('http://localhost:3002/users')
time.sleep(2)
self.page.wait_for_load_state('networkidle')
print("2. 检查页面元素...")
current_url = self.page.url
print(f" 当前URL: {current_url}")
has_user_list = self.page.locator('table, .el-table').count() > 0
print(f" 用户列表表格: {'存在' if has_user_list else '不存在'}")
self.page.screenshot(path='/tmp/user_management.png', full_page=True)
print(" 截图已保存")
if '/users' in current_url:
print("✅ 用户管理页面访问成功")
self.test_results.append(("用户管理", "PASS"))
return True
else:
print("❌ 用户管理页面访问失败")
self.test_results.append(("用户管理", "FAIL"))
return False
except Exception as e:
print(f"❌ 用户管理测试错误: {str(e)}")
self.test_results.append(("用户管理", "ERROR"))
return False
def test_role_management(self):
"""角色管理功能测试"""
print("\n" + "=" * 60)
print("测试3: 角色管理功能")
print("=" * 60)
try:
print("1. 导航到角色管理页面...")
self.page.goto('http://localhost:3002/roles')
time.sleep(2)
self.page.wait_for_load_state('networkidle')
print("2. 检查页面元素...")
current_url = self.page.url
print(f" 当前URL: {current_url}")
has_role_list = self.page.locator('table, .el-table').count() > 0
print(f" 角色列表表格: {'存在' if has_role_list else '不存在'}")
self.page.screenshot(path='/tmp/role_management.png', full_page=True)
print(" 截图已保存")
if '/roles' in current_url:
print("✅ 角色管理页面访问成功")
self.test_results.append(("角色管理", "PASS"))
return True
else:
print("❌ 角色管理页面访问失败")
self.test_results.append(("角色管理", "FAIL"))
return False
except Exception as e:
print(f"❌ 角色管理测试错误: {str(e)}")
self.test_results.append(("角色管理", "ERROR"))
return False
def test_dashboard(self):
"""Dashboard功能测试"""
print("\n" + "=" * 60)
print("测试4: Dashboard功能")
print("=" * 60)
try:
print("1. 导航到Dashboard页面...")
self.page.goto('http://localhost:3002/dashboard')
time.sleep(2)
self.page.wait_for_load_state('networkidle')
print("2. 检查页面元素...")
current_url = self.page.url
print(f" 当前URL: {current_url}")
page_title = self.page.title()
print(f" 页面标题: {page_title}")
self.page.screenshot(path='/tmp/dashboard.png', full_page=True)
print(" 截图已保存")
if '/dashboard' in current_url:
print("✅ Dashboard页面访问成功")
self.test_results.append(("Dashboard", "PASS"))
return True
else:
print("❌ Dashboard页面访问失败")
self.test_results.append(("Dashboard", "FAIL"))
return False
except Exception as e:
print(f"❌ Dashboard测试错误: {str(e)}")
self.test_results.append(("Dashboard", "ERROR"))
return False
def run_all_tests(self):
"""运行所有测试"""
print("\n" + "=" * 60)
print("开始运行完整测试套件")
print("=" * 60)
self.setup()
try:
if not self.login():
print("\n❌ 登录失败,无法继续后续测试")
return
self.test_dashboard()
self.test_user_management()
self.test_role_management()
finally:
self.print_summary()
self.teardown()
def print_summary(self):
"""打印测试摘要"""
print("\n" + "=" * 60)
print("测试结果摘要")
print("=" * 60)
pass_count = sum(1 for _, result in self.test_results if result == "PASS")
fail_count = sum(1 for _, result in self.test_results if result == "FAIL")
error_count = sum(1 for _, result in self.test_results if result == "ERROR")
for test_name, result in self.test_results:
icon = "" if result == "PASS" else "" if result == "FAIL" else "⚠️"
print(f"{icon} {test_name}: {result}")
print("\n" + "-" * 60)
print(f"总计: {len(self.test_results)} 个测试")
print(f"通过: {pass_count}")
print(f"失败: {fail_count}")
print(f"错误: {error_count}")
print("=" * 60)
if fail_count == 0 and error_count == 0:
print("\n🎉 所有测试通过!")
else:
print(f"\n⚠️ 有 {fail_count + error_count} 个测试未通过")
if __name__ == "__main__":
suite = E2ETestSuite()
suite.run_all_tests()
@@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""
Novalon管理系统全面业务流程测试 - 最终版
确保在同一个浏览器上下文中保持登录状态
"""
import time
import json
from datetime import datetime
from playwright.sync_api import sync_playwright, Page
class TestResult:
def __init__(self):
self.total = 0
self.passed = 0
self.failed = 0
self.errors = []
self.start_time = datetime.now()
def add_pass(self, test_name):
self.total += 1
self.passed += 1
print(f"{test_name} - 通过")
def add_fail(self, test_name, error):
self.total += 1
self.failed += 1
self.errors.append({"test": test_name, "error": str(error)})
print(f"{test_name} - 失败: {error}")
def print_summary(self):
duration = (datetime.now() - self.start_time).total_seconds()
print("\n" + "="*80)
print("测试总结")
print("="*80)
print(f"总测试数: {self.total}")
print(f"通过: {self.passed}")
print(f"失败: {self.failed}")
print(f"成功率: {(self.passed/self.total*100):.2f}%")
print(f"耗时: {duration:.2f}")
if self.errors:
print("\n失败详情:")
for error in self.errors:
print(f" - {error['test']}: {error['error']}")
print("="*80)
result = TestResult()
def login_and_keep_session(page: Page):
"""登录并保持会话"""
try:
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token保存到localStorage
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
print(f"✅ 登录成功,Token: {token[:50]}...")
return True
return False
except Exception as e:
print(f"登录失败: {e}")
return False
def test_page_load(page: Page, name: str, url: str):
"""测试页面加载"""
print(f"\n📋 测试{name}...")
try:
# 使用点击导航而不是goto,保持会话
# 先回到首页
if page.url != 'http://localhost:3002/dashboard':
page.goto("http://localhost:3002/dashboard")
page.wait_for_load_state("networkidle")
time.sleep(1)
# 通过侧边栏导航
try:
# 尝试点击侧边栏菜单
menu_item = page.locator(f'text="{name}"').first
if menu_item.is_visible():
menu_item.click()
page.wait_for_load_state("networkidle")
time.sleep(2)
else:
# 如果菜单不可见,直接导航
page.goto(url)
page.wait_for_load_state("networkidle")
time.sleep(2)
except:
# 如果点击失败,直接导航
page.goto(url)
page.wait_for_load_state("networkidle")
time.sleep(2)
# 检查是否被重定向到登录页
if '/login' in page.url:
result.add_fail(f"{name}-页面加载", "被重定向到登录页,会话丢失")
return
# 验证页面加载
page.wait_for_selector('table, [class*="card"], [class*="stats"], [class*="tree"]', timeout=5000)
result.add_pass(f"{name}-页面加载")
except Exception as e:
result.add_fail(f"{name}-页面加载", e)
def main():
print("="*80)
print("Novalon管理系统全面业务流程测试")
print("="*80)
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*80)
with sync_playwright() as p:
# 启动浏览器
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
# 登录并保持会话
print("\n🔐 测试登录...")
if login_and_keep_session(page):
result.add_pass("登录功能")
else:
result.add_fail("登录功能", "登录失败")
return
# 测试仪表板
test_page_load(page, "仪表板", "http://localhost:3002/dashboard")
# 测试用户管理
test_page_load(page, "用户管理", "http://localhost:3002/users")
# 测试角色管理
test_page_load(page, "角色管理", "http://localhost:3002/roles")
# 测试菜单管理
test_page_load(page, "菜单管理", "http://localhost:3002/menus")
# 测试字典管理
test_page_load(page, "字典管理", "http://localhost:3002/dict")
# 测试系统配置
test_page_load(page, "系统配置", "http://localhost:3002/sys/config")
# 测试文件管理
test_page_load(page, "文件管理", "http://localhost:3002/files")
# 测试通知管理
test_page_load(page, "通知管理", "http://localhost:3002/notice")
# 测试操作日志
test_page_load(page, "操作日志", "http://localhost:3002/oplog")
# 测试登录日志
test_page_load(page, "登录日志", "http://localhost:3002/loginlog")
except Exception as e:
print(f"\n❌ 测试执行出错: {e}")
import traceback
traceback.print_exc()
finally:
browser.close()
# 打印测试总结
result.print_summary()
# 返回退出码
return 0 if result.failed == 0 else 1
if __name__ == "__main__":
exit(main())
@@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""
直接测试网关
"""
import requests
import time
# 先登录获取Token
login_data = {
"username": "admin",
"password": "admin123"
}
print("1. 登录...")
response = requests.post("http://localhost:8080/api/auth/login", json=login_data)
print(f"状态码: {response.status_code}")
print(f"响应: {response.text[:200]}...")
if response.status_code == 200:
token = response.json().get('token')
print(f"\nToken: {token[:50]}...")
# 测试用户管理API
print("\n2. 测试用户管理API...")
headers = {
"Authorization": f"Bearer {token}"
}
response2 = requests.get("http://localhost:8080/api/users/page?page=0&size=10", headers=headers)
print(f"状态码: {response2.status_code}")
print(f"响应: {response2.text[:200]}...")
# 测试用户统计API
print("\n3. 测试用户统计API...")
response3 = requests.get("http://localhost:8080/api/users/count", headers=headers)
print(f"状态码: {response3.status_code}")
print(f"响应: {response3.text[:200]}...")
+61
View File
@@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
测试JWT Token解析
"""
import requests
import json
# 登录获取Token
login_data = {
"username": "admin",
"password": "admin123"
}
print("1. 登录...")
# 先通过前端proxy登录(会自动添加签名)
from playwright.sync_api import sync_playwright
import time
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("http://localhost:3002/login")
page.wait_for_load_state("networkidle")
page.fill('input[placeholder="请输入用户名"]', 'admin')
page.fill('input[placeholder="请输入密码"]', 'admin123')
page.click('button:has-text("登录")')
# 等待Token
for i in range(10):
time.sleep(1)
token = page.evaluate("localStorage.getItem('token')")
if token:
break
browser.close()
print(f"\nToken: {token[:100]}...")
print(f"\nToken长度: {len(token)}")
# 解析Token的payload
import base64
def decode_jwt_payload(token):
parts = token.split('.')
if len(parts) != 3:
return None
payload = parts[1]
# 添加padding
padding = len(payload) % 4
if padding:
payload += '=' * (4 - padding)
decoded = base64.b64decode(payload)
return json.loads(decoded)
payload = decode_jwt_payload(token)
print(f"\nToken Payload:")
print(json.dumps(payload, indent=2))
+30
View File
@@ -0,0 +1,30 @@
#!/usr/bin/env python3
"""
测试JWT密钥
"""
import base64
# Gateway配置的secret(去掉enc:前缀)
encrypted_secret = "U2FsdGVkX1+vZ5Y9QmKxL8nN3rP7tW2jH4fG6dA8sB1cE5yN0zX3qV7wM4"
# Manage-app默认的secret
default_secret = "default-secret-key-change-in-production"
print("Gateway配置的secretBase64编码):")
print(f" {encrypted_secret}")
print(f" 长度: {len(encrypted_secret)}")
try:
decoded = base64.b64decode(encrypted_secret)
print(f"\n解码后:")
print(f" {decoded}")
print(f" 长度: {len(decoded)}")
except Exception as e:
print(f"\n解码失败: {e}")
print(f"\nManage-app默认secret:")
print(f" {default_secret}")
print(f" 长度: {len(default_secret)}")
print(f"\n两个secret是否相同: {encrypted_secret == default_secret}")
+103
View File
@@ -0,0 +1,103 @@
"""
E2E登录功能完整验证
验证登录成功后的所有状态
"""
from playwright.sync_api import sync_playwright
import time
def test_login_complete():
"""完整测试登录功能"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
print("=" * 60)
print("E2E登录功能完整验证")
print("=" * 60)
print("\n1. 访问登录页面...")
page.goto('http://localhost:3002/login')
page.wait_for_load_state('networkidle')
time.sleep(1)
print("\n2. 填写登录表单...")
page.fill('input[type="text"], input[placeholder*="用户名"]', 'admin')
page.fill('input[type="password"]', 'admin123')
print(" 用户名: admin")
print(" 密码: admin123")
print("\n3. 点击登录按钮...")
with page.expect_navigation(timeout=10000):
page.click('button:has-text("登录")')
print("\n4. 等待页面加载...")
time.sleep(3)
page.wait_for_load_state('networkidle')
print("\n5. 检查登录状态...")
current_url = page.url
print(f" 当前URL: {current_url}")
token = page.evaluate('() => localStorage.getItem("token")')
userId = page.evaluate('() => localStorage.getItem("userId")')
username = page.evaluate('() => localStorage.getItem("username")')
print(f" Token: {token[:50] if token else 'None'}...")
print(f" UserId: {userId}")
print(f" Username: {username}")
print("\n6. 检查页面内容...")
page.screenshot(path='/tmp/login_complete.png', full_page=True)
print(" 截图已保存到 /tmp/login_complete.png")
page_title = page.title()
print(f" 页面标题: {page_title}")
has_dashboard = page.locator('text=Dashboard, text=仪表盘, text=首页').count() > 0
print(f" 包含Dashboard内容: {has_dashboard}")
print("\n" + "=" * 60)
print("验证结果:")
print("=" * 60)
success = True
if token and userId and username:
print("✅ localStorage数据正确")
else:
print("❌ localStorage数据缺失")
success = False
if '/login' not in current_url:
print("✅ 已跳转离开登录页")
else:
print("⚠️ 仍在登录页(可能是路由问题)")
if has_dashboard:
print("✅ Dashboard内容已加载")
else:
print("⚠️ Dashboard内容未找到")
print("=" * 60)
if success:
print("\n🎉 登录功能测试通过!")
else:
print("\n❌ 登录功能测试失败")
return success
except Exception as e:
print(f"\n❌ 测试错误: {str(e)}")
import traceback
traceback.print_exc()
page.screenshot(path='/tmp/login_error_complete.png', full_page=True)
return False
finally:
browser.close()
if __name__ == "__main__":
test_login_complete()
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
详细登录测试 - 查看请求和响应详情
"""
from playwright.sync_api import sync_playwright
import time
def test_login_detailed():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
requests_log = []
responses_log = []
def handle_request(request):
if '/api/' in request.url:
headers = dict(request.headers)
requests_log.append({
'url': request.url,
'method': request.method,
'headers': headers
})
print(f"\n请求: {request.method} {request.url}")
if 'authorization' in headers:
print(f" Authorization: {headers['authorization'][:50]}...")
if 'x-signature' in headers:
print(f" X-Signature: {headers['x-signature']}")
if 'x-timestamp' in headers:
print(f" X-Timestamp: {headers['x-timestamp']}")
if 'x-nonce' in headers:
print(f" X-Nonce: {headers['x-nonce']}")
def handle_response(response):
if '/api/' in response.url:
responses_log.append({
'url': response.url,
'status': response.status,
'body': response.text() if response.status != 200 else None
})
print(f"响应: {response.status} {response.url}")
if response.status != 200:
try:
body = response.text()
print(f" 错误: {body[:200]}")
except:
pass
page.on("request", handle_request)
page.on("response", handle_response)
try:
print("访问登录页...")
page.goto("http://localhost:3002/login", timeout=10000)
page.wait_for_load_state("networkidle", timeout=10000)
print("\n填写登录表单...")
page.fill('input[type="text"]', 'admin')
page.fill('input[type="password"]', 'admin123')
print("\n点击登录按钮...")
page.click('button[type="submit"]')
time.sleep(5)
current_url = page.url
print(f"\n当前URL: {current_url}")
token = page.evaluate("localStorage.getItem('token')")
print(f"Token: {token if token else '不存在'}")
if token:
print(f"Token内容: {token[:100]}...")
except Exception as e:
print(f"\n错误: {e}")
finally:
browser.close()
if __name__ == "__main__":
test_login_detailed()
+94
View File
@@ -0,0 +1,94 @@
"""
E2E登录功能测试
使用Playwright测试登录流程
"""
from playwright.sync_api import sync_playwright
import time
def test_login():
"""测试登录功能"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
print("1. 访问登录页面...")
page.goto('http://localhost:3002')
page.wait_for_load_state('networkidle')
time.sleep(2)
print("2. 检查页面标题...")
title = page.title()
print(f" 页面标题: {title}")
print("3. 截图保存当前页面...")
page.screenshot(path='/tmp/login_page.png', full_page=True)
print(" 截图已保存到 /tmp/login_page.png")
print("4. 查找登录表单...")
username_input = page.locator('input[type="text"], input[placeholder*="用户名"], input[placeholder*="账号"]').first
password_input = page.locator('input[type="password"]').first
login_button = page.locator('button:has-text("登录"), button:has-text("Login")').first
if username_input.count() == 0:
print(" 未找到用户名输入框,尝试其他选择器...")
username_input = page.locator('input').nth(0)
if password_input.count() == 0:
print(" 未找到密码输入框,尝试其他选择器...")
password_input = page.locator('input').nth(1)
print("5. 填写登录信息...")
username_input.fill('admin')
print(" 用户名: admin")
password_input.fill('admin123')
print(" 密码: admin123")
print("6. 点击登录按钮...")
login_button.click()
print("7. 等待登录响应...")
time.sleep(3)
page.wait_for_load_state('networkidle')
print("8. 检查登录结果...")
current_url = page.url
print(f" 当前URL: {current_url}")
page.screenshot(path='/tmp/login_result.png', full_page=True)
print(" 登录结果截图已保存到 /tmp/login_result.png")
if 'dashboard' in current_url.lower() or 'home' in current_url.lower():
print("✅ 登录成功!已跳转到主页")
return True
elif 'login' not in current_url.lower():
print("✅ 登录成功!已跳转离开登录页")
return True
else:
print("❌ 登录可能失败,仍在登录页")
return False
except Exception as e:
print(f"❌ 测试过程中出现错误: {str(e)}")
page.screenshot(path='/tmp/login_error.png', full_page=True)
print(" 错误截图已保存到 /tmp/login_error.png")
return False
finally:
browser.close()
if __name__ == "__main__":
print("=" * 60)
print("E2E登录功能测试")
print("=" * 60)
success = test_login()
print("=" * 60)
if success:
print("测试结果: ✅ 通过")
else:
print("测试结果: ❌ 失败")
print("=" * 60)
+51
View File
@@ -0,0 +1,51 @@
import hmac
import hashlib
import base64
import time
import json
import requests
SECRET = 'NovalonManageSystemSecretKey2026'
def generate_signature(method, path, query='', body='', timestamp=None, nonce=None):
if timestamp is None:
timestamp = int(time.time() * 1000)
if nonce is None:
nonce = f"{int(timestamp)}-{hash(time.time())}"
string_to_sign = f"{method}\n{path}\n{query}\n{body}\n{timestamp}\n{nonce}"
signature = hmac.new(
SECRET.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
signature_base64 = base64.b64encode(signature).decode('utf-8')
return signature_base64, timestamp, nonce
method = 'POST'
path = '/api/auth/login'
body = ''
signature, timestamp, nonce = generate_signature(method, path, body=body)
print(f"X-Signature: {signature}")
print(f"X-Timestamp: {timestamp}")
print(f"X-Nonce: {nonce}")
headers = {
'Content-Type': 'application/json',
'X-Signature': signature,
'X-Timestamp': str(timestamp),
'X-Nonce': nonce
}
response = requests.post('http://localhost:8080/api/auth/login',
headers=headers,
data='{"username":"admin","password":"admin123"}',
verify=False)
print(f"\nResponse Status: {response.status_code}")
print(f"Response Body: {response.text}")
@@ -0,0 +1,123 @@
"""
测试前后端签名验证
"""
import hmac
import hashlib
import base64
import time
import requests
def generate_signature(method, path, query='', body='', timestamp=None, nonce=None):
"""生成签名(模拟后端逻辑)"""
if timestamp is None:
timestamp = int(time.time() * 1000)
if nonce is None:
nonce = f"{int(time.time())}-test123"
secret = 'NovalonManageSystemSecretKey2026'
string_to_sign = '\n'.join([
method,
path,
query or '',
body or '',
str(timestamp),
nonce
])
print(f"签名字符串:\n{string_to_sign}")
print(f"\n签名字符串长度: {len(string_to_sign)}")
signature = hmac.new(
secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
signature_base64 = base64.b64encode(signature).decode('utf-8')
return signature_base64, timestamp, nonce
def test_signature():
"""测试签名验证"""
print("=" * 60)
print("测试前后端签名验证")
print("=" * 60)
# 测试1: 登录接口(在白名单中,不需要签名)
print("\n测试1: 登录接口(白名单)")
login_data = {
"username": "admin",
"password": "admin123"
}
response = requests.post(
'http://localhost:8080/api/auth/login',
json=login_data
)
print(f"状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
token = data.get('token')
print(f"✅ 登录成功,获取token: {token[:50]}...")
else:
print(f"❌ 登录失败: {response.text}")
return
# 测试2: 用户列表接口(需要签名)
print("\n测试2: 用户列表接口(需要签名)")
method = 'GET'
path = '/api/users/page'
query = 'page=0&size=10&sortBy=id&sortOrder=asc'
body = ''
signature, timestamp, nonce = generate_signature(method, path, query, body)
print(f"\n生成的签名: {signature}")
print(f"时间戳: {timestamp}")
print(f"Nonce: {nonce}")
headers = {
'Authorization': f'Bearer {token}',
'X-Signature': signature,
'X-Timestamp': str(timestamp),
'X-Nonce': nonce,
'Content-Type': 'application/json'
}
url = f'http://localhost:8080{path}?{query}'
print(f"\n请求URL: {url}")
print(f"请求头:")
for key, value in headers.items():
if key in ['X-Signature', 'Authorization']:
print(f" {key}: {value[:30]}...")
else:
print(f" {key}: {value}")
response = requests.get(url, headers=headers)
print(f"\n响应状态码: {response.status_code}")
if response.status_code == 200:
print(f"✅ 签名验证成功")
data = response.json()
print(f"返回数据: {str(data)[:100]}...")
else:
print(f"❌ 签名验证失败")
print(f"响应内容: {response.text}")
# 测试3: 不带签名的请求
print("\n测试3: 不带签名的请求")
headers_no_sig = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = requests.get(url, headers=headers_no_sig)
print(f"响应状态码: {response.status_code}")
print(f"响应内容: {response.text[:200]}")
if __name__ == "__main__":
test_signature()
+44
View File
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""
测试Token生成和验证
"""
import base64
import json
# 从测试中获取的Token
token = "eyJhbGciOiJIUzM4NCJ9.eyJyb2xlcyI6W10sInVzZXJJZCI6MTA2NCwidXNlcm5hbWUiOiJhZG1pbiIsInN1YiI6ImFkbWluIiwiaWF0IjoxNzc1MDkxNzg4LCJleHAiOjE3NzUxNzgxODh9"
# 解析Token header
def decode_jwt_header(token):
parts = token.split('.')
if len(parts) < 1:
return None
header = parts[0]
# 添加padding
padding = len(header) % 4
if padding:
header += '=' * (4 - padding)
decoded = base64.b64decode(header)
return json.loads(decoded)
header = decode_jwt_header(token)
print("Token Header:")
print(json.dumps(header, indent=2))
print("\n算法: " + header.get('alg', 'Unknown'))
# Gateway secret
gateway_secret = "U2FsdGVkX1+vZ5Y9QmKxL8nN3rP7tW2jH4fG6dA8sB1cE5yN0zX3qV7wM4"
print(f"\nGateway secret长度: {len(gateway_secret)} bytes")
print(f"Gateway secret支持算法: HS384 (因为长度 >= 48 bytes)")
print("\n问题分析:")
print("1. manage-app使用JwtTokenProvider生成Token")
print("2. JwtTokenProvider使用Keys.hmacShaKeyFor()自动选择算法")
print("3. Gateway secret长度58 bytes,自动选择HS384算法")
print("4. Gateway使用JwtUtil验证Token")
print("5. JwtUtil使用new SecretKeySpec()创建密钥")
print("6. 需要确保JwtUtil也使用相同的算法")
@@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
检查Repository层命名规范
"""
import os
import re
from pathlib import Path
def check_repository_naming():
"""检查Repository层命名规范"""
base_path = Path("/Users/zhangxiang/Codes/Novalon/novalon-manage-system/novalon-manage-api")
print("=" * 60)
print("Repository层命名规范检查")
print("=" * 60)
# 查找所有Repository接口
repository_interfaces = []
for java_file in base_path.rglob("*Repository.java"):
content = java_file.read_text()
if "interface" in content:
repository_interfaces.append(java_file)
print(f"\n找到 {len(repository_interfaces)} 个Repository接口:")
issues = []
for interface in sorted(repository_interfaces):
interface_name = interface.stem
content = interface.read_text()
# 检查命名规范
if interface_name.startswith('I'):
print(f"{interface_name}")
else:
print(f" ⚠️ {interface_name} (应该以I开头)")
issues.append((interface, interface_name, f"I{interface_name}"))
# 查找所有Repository实现类
repository_impls = []
for java_file in base_path.rglob("*Repository*.java"):
if "impl" in str(java_file) or "RepositoryImpl" in java_file.name:
content = java_file.read_text()
if "class" in content and "Repository" in content:
repository_impls.append(java_file)
print(f"\n找到 {len(repository_impls)} 个Repository实现类:")
for impl in sorted(repository_impls):
impl_name = impl.stem
content = impl.read_text()
# 检查是否实现了接口
implements_match = re.search(r'implements\s+(\w+)', content)
if implements_match:
interface_name = implements_match.group(1)
# 检查命名规范
if interface_name.startswith('I'):
expected_impl_name = interface_name[1:] # 移除I前缀
if impl_name == expected_impl_name:
print(f"{impl_name} implements {interface_name}")
else:
print(f" ⚠️ {impl_name} implements {interface_name}")
print(f" 建议重命名为: {expected_impl_name}")
issues.append((impl, impl_name, expected_impl_name))
else:
print(f" {impl_name} implements {interface_name} (非标准接口)")
else:
print(f"{impl_name} (未找到implements关键字)")
# 检查是否有不符合规范的命名
print("\n" + "=" * 60)
if issues:
print(f"发现 {len(issues)} 个命名不规范的问题:")
for file, current_name, expected_name in issues:
print(f" - {current_name} -> {expected_name}")
print(f" 文件: {file.relative_to(base_path)}")
else:
print("✅ 所有Repository命名都符合规范!")
print("=" * 60)
if __name__ == "__main__":
check_repository_naming()
@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""
检查Service层命名规范
"""
import os
import re
from pathlib import Path
def check_service_naming():
"""检查Service层命名规范"""
base_path = Path("/Users/zhangxiang/Codes/Novalon/novalon-manage-system/novalon-manage-api/manage-sys/src/main/java")
print("=" * 60)
print("Service层命名规范检查")
print("=" * 60)
# 查找所有Service接口
service_interfaces = []
for java_file in base_path.rglob("*Service.java"):
content = java_file.read_text()
if f"interface I" in content or re.search(r'interface\s+I\w+Service', content):
service_interfaces.append(java_file)
print(f"\n找到 {len(service_interfaces)} 个Service接口:")
for interface in sorted(service_interfaces):
interface_name = interface.stem
print(f"{interface_name}")
# 查找所有Service实现类
service_impls = []
for java_file in base_path.rglob("*Service*.java"):
if "impl" in str(java_file) or "handler" in str(java_file):
content = java_file.read_text()
if "class" in content and "Service" in content:
service_impls.append(java_file)
print(f"\n找到 {len(service_impls)} 个Service实现类:")
issues = []
for impl in sorted(service_impls):
impl_name = impl.stem
content = impl.read_text()
# 检查是否实现了接口
implements_match = re.search(r'implements\s+(\w+)', content)
if implements_match:
interface_name = implements_match.group(1)
# 检查命名规范
if interface_name.startswith('I'):
expected_impl_name = interface_name[1:] # 移除I前缀
# 特殊情况:ExceptionLogServiceImpl是适配器
if impl_name == "ExceptionLogServiceImpl":
print(f"{impl_name} (适配器类)")
elif impl_name == expected_impl_name:
print(f"{impl_name} implements {interface_name}")
else:
print(f" ⚠️ {impl_name} implements {interface_name}")
print(f" 建议重命名为: {expected_impl_name}")
issues.append((impl, impl_name, expected_impl_name))
else:
print(f" {impl_name} implements {interface_name} (非标准接口)")
else:
print(f"{impl_name} (未找到implements关键字)")
# 检查是否有不符合规范的命名
print("\n" + "=" * 60)
if issues:
print(f"发现 {len(issues)} 个命名不规范的问题:")
for impl, current_name, expected_name in issues:
print(f" - {current_name} -> {expected_name}")
print(f" 文件: {impl}")
else:
print("✅ 所有Service命名都符合规范!")
print("=" * 60)
if __name__ == "__main__":
check_service_naming()