Files
novalon-manage-system/scripts/server_manager.py
T
张翔 e2ad1331cc feat: 添加测试框架和覆盖率报告功能
feat(测试): 新增Playwright和Vitest测试配置
feat(测试): 添加测试覆盖率报告生成功能
feat(测试): 实现前后端测试脚本集成

fix(测试): 修复测试密码不匹配问题
fix(测试): 修正URL等待策略
fix(测试): 调整错误消息选择器

refactor(测试): 重构测试目录结构
refactor(测试): 优化测试用例组织方式

docs: 更新测试报告文档
docs: 添加测试覆盖率报告模板

ci: 添加Docker测试环境配置
ci: 实现测试自动化脚本

chore: 更新依赖版本
chore: 添加测试相关配置文件
2026-03-25 09:03:37 +08:00

255 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
服务器管理脚本 - 自动化启动和停止测试所需的服务器
"""
import subprocess
import time
import signal
import sys
import os
from typing import List, Dict, Optional
from dataclasses import dataclass
import requests
@dataclass
class ServerConfig:
"""服务器配置"""
name: str
command: str
port: int
health_url: str
working_dir: str
env_vars: Dict[str, str] = None
class ServerManager:
"""服务器管理器"""
def __init__(self):
self.servers: Dict[str, subprocess.Popen] = {}
self.server_configs: Dict[str, ServerConfig] = {}
def add_server(self, config: ServerConfig):
"""添加服务器配置"""
self.server_configs[config.name] = config
def start_server(self, name: str) -> bool:
"""启动指定服务器"""
if name not in self.server_configs:
print(f"❌ 服务器配置不存在: {name}")
return False
if name in self.servers:
print(f"⚠️ 服务器已在运行: {name}")
return True
config = self.server_configs[name]
print(f"🚀 启动服务器: {name}")
print(f" 命令: {config.command}")
print(f" 端口: {config.port}")
env = os.environ.copy()
if config.env_vars:
env.update(config.env_vars)
try:
process = subprocess.Popen(
config.command,
shell=True,
cwd=config.working_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid
)
self.servers[name] = process
# 等待服务器启动
if self._wait_for_server(config):
print(f"✅ 服务器启动成功: {name} (PID: {process.pid})")
return True
else:
print(f"❌ 服务器启动失败: {name}")
self.stop_server(name)
return False
except Exception as e:
print(f"❌ 启动服务器时出错: {name}, 错误: {e}")
return False
def _wait_for_server(self, config: ServerConfig, timeout: int = 60) -> bool:
"""等待服务器就绪"""
print(f"⏳ 等待服务器就绪: {config.health_url}")
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(config.health_url, timeout=5)
if response.status_code == 200:
print(f"✅ 服务器健康检查通过: {config.health_url}")
return True
except requests.exceptions.RequestException:
time.sleep(2)
continue
print(f"❌ 服务器健康检查超时: {config.health_url}")
return False
def stop_server(self, name: str) -> bool:
"""停止指定服务器"""
if name not in self.servers:
print(f"⚠️ 服务器未运行: {name}")
return True
print(f"🛑 停止服务器: {name}")
process = self.servers[name]
try:
# 发送SIGTERM信号
process.send_signal(signal.SIGTERM)
# 等待进程结束
try:
process.wait(timeout=10)
print(f"✅ 服务器已停止: {name}")
except subprocess.TimeoutExpired:
# 如果进程没有正常结束,强制杀死
print(f"⚠️ 强制终止服务器: {name}")
process.kill()
process.wait()
except Exception as e:
print(f"❌ 停止服务器时出错: {name}, 错误: {e}")
return False
del self.servers[name]
return True
def stop_all_servers(self) -> bool:
"""停止所有服务器"""
print("🛑 停止所有服务器...")
success = True
for name in list(self.servers.keys()):
if not self.stop_server(name):
success = False
return success
def get_server_status(self, name: str) -> Optional[str]:
"""获取服务器状态"""
if name not in self.servers:
return "stopped"
process = self.servers[name]
if process.poll() is None:
return "running"
else:
return "stopped"
def restart_server(self, name: str) -> bool:
"""重启服务器"""
print(f"🔄 重启服务器: {name}")
if not self.stop_server(name):
return False
time.sleep(2)
return self.start_server(name)
def create_default_manager() -> ServerManager:
"""创建默认的服务器管理器"""
manager = ServerManager()
# 后端服务器配置
backend_config = ServerConfig(
name="backend",
command="cd novalon-manage-api/manage-app && java -jar target/manage-app-1.0.0.jar",
port=8084,
health_url="http://localhost:8084/actuator/health",
working_dir=".",
env_vars={
"DB_HOST": "localhost",
"DB_PORT": "55432",
"DB_NAME": "manage_system",
"DB_USERNAME": "postgres",
"DB_PASSWORD": "postgres"
}
)
# 前端服务器配置
frontend_config = ServerConfig(
name="frontend",
command="cd novalon-manage-web && npm run dev",
port=3003,
health_url="http://localhost:3003",
working_dir="."
)
manager.add_server(backend_config)
manager.add_server(frontend_config)
return manager
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="服务器管理脚本")
parser.add_argument("action", choices=["start", "stop", "restart", "status"], help="操作类型")
parser.add_argument("--server", "-s", help="服务器名称 (backend/frontend/all)")
parser.add_argument("--wait", "-w", type=int, default=5, help="启动后等待时间(秒)")
args = parser.parse_args()
manager = create_default_manager()
if args.action == "start":
if args.server == "all":
# 启动所有服务器
for server_name in manager.server_configs.keys():
if not manager.start_server(server_name):
sys.exit(1)
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
time.sleep(args.wait)
else:
if not manager.start_server(args.server):
sys.exit(1)
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
time.sleep(args.wait)
elif args.action == "stop":
if args.server == "all":
manager.stop_all_servers()
else:
if not manager.stop_server(args.server):
sys.exit(1)
elif args.action == "restart":
if args.server == "all":
manager.stop_all_servers()
time.sleep(2)
for server_name in manager.server_configs.keys():
if not manager.start_server(server_name):
sys.exit(1)
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
time.sleep(args.wait)
else:
if not manager.restart_server(args.server):
sys.exit(1)
print(f"⏳ 等待 {args.wait} 秒让服务器完全启动...")
time.sleep(args.wait)
elif args.action == "status":
if args.server == "all":
print("📊 服务器状态:")
for server_name in manager.server_configs.keys():
status = manager.get_server_status(server_name)
print(f" {server_name}: {status}")
else:
status = manager.get_server_status(args.server)
print(f"📊 {args.server} 状态: {status}")
if __name__ == "__main__":
main()