feat: 添加测试框架和覆盖率报告功能
feat(测试): 新增Playwright和Vitest测试配置 feat(测试): 添加测试覆盖率报告生成功能 feat(测试): 实现前后端测试脚本集成 fix(测试): 修复测试密码不匹配问题 fix(测试): 修正URL等待策略 fix(测试): 调整错误消息选择器 refactor(测试): 重构测试目录结构 refactor(测试): 优化测试用例组织方式 docs: 更新测试报告文档 docs: 添加测试覆盖率报告模板 ci: 添加Docker测试环境配置 ci: 实现测试自动化脚本 chore: 更新依赖版本 chore: 添加测试相关配置文件
This commit is contained in:
@@ -0,0 +1,255 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
服务器管理脚本 - 自动化启动和停止测试所需的服务器
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
import os
|
||||
from typing import List, Dict, Optional
|
||||
from dataclasses import dataclass
|
||||
import requests
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServerConfig:
|
||||
"""服务器配置"""
|
||||
name: str
|
||||
command: str
|
||||
port: int
|
||||
health_url: str
|
||||
working_dir: str
|
||||
env_vars: Dict[str, str] = None
|
||||
|
||||
|
||||
class ServerManager:
|
||||
"""服务器管理器"""
|
||||
|
||||
def __init__(self):
|
||||
self.servers: Dict[str, subprocess.Popen] = {}
|
||||
self.server_configs: Dict[str, ServerConfig] = {}
|
||||
|
||||
def add_server(self, config: ServerConfig):
|
||||
"""添加服务器配置"""
|
||||
self.server_configs[config.name] = config
|
||||
|
||||
def start_server(self, name: str) -> bool:
|
||||
"""启动指定服务器"""
|
||||
if name not in self.server_configs:
|
||||
print(f"❌ 服务器配置不存在: {name}")
|
||||
return False
|
||||
|
||||
if name in self.servers:
|
||||
print(f"⚠️ 服务器已在运行: {name}")
|
||||
return True
|
||||
|
||||
config = self.server_configs[name]
|
||||
print(f"🚀 启动服务器: {name}")
|
||||
print(f" 命令: {config.command}")
|
||||
print(f" 端口: {config.port}")
|
||||
|
||||
env = os.environ.copy()
|
||||
if config.env_vars:
|
||||
env.update(config.env_vars)
|
||||
|
||||
try:
|
||||
process = subprocess.Popen(
|
||||
config.command,
|
||||
shell=True,
|
||||
cwd=config.working_dir,
|
||||
env=env,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
preexec_fn=os.setsid
|
||||
)
|
||||
self.servers[name] = process
|
||||
|
||||
# 等待服务器启动
|
||||
if self._wait_for_server(config):
|
||||
print(f"✅ 服务器启动成功: {name} (PID: {process.pid})")
|
||||
return True
|
||||
else:
|
||||
print(f"❌ 服务器启动失败: {name}")
|
||||
self.stop_server(name)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 启动服务器时出错: {name}, 错误: {e}")
|
||||
return False
|
||||
|
||||
def _wait_for_server(self, config: ServerConfig, timeout: int = 60) -> bool:
|
||||
"""等待服务器就绪"""
|
||||
print(f"⏳ 等待服务器就绪: {config.health_url}")
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
try:
|
||||
response = requests.get(config.health_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
print(f"✅ 服务器健康检查通过: {config.health_url}")
|
||||
return True
|
||||
except requests.exceptions.RequestException:
|
||||
time.sleep(2)
|
||||
continue
|
||||
|
||||
print(f"❌ 服务器健康检查超时: {config.health_url}")
|
||||
return False
|
||||
|
||||
def stop_server(self, name: str) -> bool:
|
||||
"""停止指定服务器"""
|
||||
if name not in self.servers:
|
||||
print(f"⚠️ 服务器未运行: {name}")
|
||||
return True
|
||||
|
||||
print(f"🛑 停止服务器: {name}")
|
||||
process = self.servers[name]
|
||||
|
||||
try:
|
||||
# 发送SIGTERM信号
|
||||
process.send_signal(signal.SIGTERM)
|
||||
|
||||
# 等待进程结束
|
||||
try:
|
||||
process.wait(timeout=10)
|
||||
print(f"✅ 服务器已停止: {name}")
|
||||
except subprocess.TimeoutExpired:
|
||||
# 如果进程没有正常结束,强制杀死
|
||||
print(f"⚠️ 强制终止服务器: {name}")
|
||||
process.kill()
|
||||
process.wait()
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 停止服务器时出错: {name}, 错误: {e}")
|
||||
return False
|
||||
|
||||
del self.servers[name]
|
||||
return True
|
||||
|
||||
def stop_all_servers(self) -> bool:
|
||||
"""停止所有服务器"""
|
||||
print("🛑 停止所有服务器...")
|
||||
success = True
|
||||
for name in list(self.servers.keys()):
|
||||
if not self.stop_server(name):
|
||||
success = False
|
||||
return success
|
||||
|
||||
def get_server_status(self, name: str) -> Optional[str]:
|
||||
"""获取服务器状态"""
|
||||
if name not in self.servers:
|
||||
return "stopped"
|
||||
|
||||
process = self.servers[name]
|
||||
if process.poll() is None:
|
||||
return "running"
|
||||
else:
|
||||
return "stopped"
|
||||
|
||||
def restart_server(self, name: str) -> bool:
|
||||
"""重启服务器"""
|
||||
print(f"🔄 重启服务器: {name}")
|
||||
if not self.stop_server(name):
|
||||
return False
|
||||
time.sleep(2)
|
||||
return self.start_server(name)
|
||||
|
||||
|
||||
def create_default_manager() -> ServerManager:
|
||||
"""创建默认的服务器管理器"""
|
||||
manager = ServerManager()
|
||||
|
||||
# 后端服务器配置
|
||||
backend_config = ServerConfig(
|
||||
name="backend",
|
||||
command="cd novalon-manage-api/manage-app && java -jar target/manage-app-1.0.0.jar",
|
||||
port=8084,
|
||||
health_url="http://localhost:8084/actuator/health",
|
||||
working_dir=".",
|
||||
env_vars={
|
||||
"DB_HOST": "localhost",
|
||||
"DB_PORT": "55432",
|
||||
"DB_NAME": "manage_system",
|
||||
"DB_USERNAME": "postgres",
|
||||
"DB_PASSWORD": "postgres"
|
||||
}
|
||||
)
|
||||
|
||||
# 前端服务器配置
|
||||
frontend_config = ServerConfig(
|
||||
name="frontend",
|
||||
command="cd novalon-manage-web && npm run dev",
|
||||
port=3003,
|
||||
health_url="http://localhost:3003",
|
||||
working_dir="."
|
||||
)
|
||||
|
||||
manager.add_server(backend_config)
|
||||
manager.add_server(frontend_config)
|
||||
|
||||
return manager
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="服务器管理脚本")
|
||||
parser.add_argument("action", choices=["start", "stop", "restart", "status"], help="操作类型")
|
||||
parser.add_argument("--server", "-s", help="服务器名称 (backend/frontend/all)")
|
||||
parser.add_argument("--wait", "-w", type=int, default=5, help="启动后等待时间(秒)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
manager = create_default_manager()
|
||||
|
||||
if args.action == "start":
|
||||
if args.server == "all":
|
||||
# 启动所有服务器
|
||||
for server_name in manager.server_configs.keys():
|
||||
if not manager.start_server(server_name):
|
||||
sys.exit(1)
|
||||
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
|
||||
time.sleep(args.wait)
|
||||
else:
|
||||
if not manager.start_server(args.server):
|
||||
sys.exit(1)
|
||||
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
|
||||
time.sleep(args.wait)
|
||||
|
||||
elif args.action == "stop":
|
||||
if args.server == "all":
|
||||
manager.stop_all_servers()
|
||||
else:
|
||||
if not manager.stop_server(args.server):
|
||||
sys.exit(1)
|
||||
|
||||
elif args.action == "restart":
|
||||
if args.server == "all":
|
||||
manager.stop_all_servers()
|
||||
time.sleep(2)
|
||||
for server_name in manager.server_configs.keys():
|
||||
if not manager.start_server(server_name):
|
||||
sys.exit(1)
|
||||
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
|
||||
time.sleep(args.wait)
|
||||
else:
|
||||
if not manager.restart_server(args.server):
|
||||
sys.exit(1)
|
||||
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
|
||||
time.sleep(args.wait)
|
||||
|
||||
elif args.action == "status":
|
||||
if args.server == "all":
|
||||
print("📊 服务器状态:")
|
||||
for server_name in manager.server_configs.keys():
|
||||
status = manager.get_server_status(server_name)
|
||||
print(f" {server_name}: {status}")
|
||||
else:
|
||||
status = manager.get_server_status(args.server)
|
||||
print(f"📊 {args.server} 状态: {status}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user