Files
张翔 08ea5fbe98 feat(admin): 添加用户管理相关文件
添加用户管理视图、API和状态管理文件
2026-03-28 14:37:29 +08:00

183 lines
7.0 KiB
Python

"""
测试数据管理器
"""
import psycopg2
from psycopg2 import pool
from typing import Optional
from datetime import datetime
import json
import logging
from .config import TestDataManagerConfig
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TestDataManager:
def __init__(self, config: TestDataManagerConfig):
self.config = config
self.connection_pool = None
self._init_connection_pool()
def _init_connection_pool(self):
try:
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=self.config.db_host,
port=self.config.db_port,
database=self.config.db_name,
user=self.config.db_user,
password=self.config.db_password
)
logger.info("数据库连接池初始化成功")
except Exception as e:
logger.error(f"数据库连接池初始化失败: {e}")
raise
def _get_connection(self):
return self.connection_pool.getconn()
def _release_connection(self, conn):
self.connection_pool.putconn(conn)
def generate_test_data(self):
logger.info("开始生成测试数据...")
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sys_user WHERE username LIKE 'test_%'")
user_count = cursor.fetchone()[0]
if user_count > 0:
logger.info(f"测试数据已存在,跳过生成(现有 {user_count} 个测试用户)")
return
self._generate_test_users(cursor, conn)
self._generate_test_almanac_data(cursor, conn)
self._generate_test_ziwei_data(cursor, conn)
conn.commit()
logger.info("测试数据生成完成")
except Exception as e:
conn.rollback()
logger.error(f"生成测试数据失败: {e}")
raise
finally:
self._release_connection(conn)
def _generate_test_users(self, cursor, conn):
logger.info("生成测试用户...")
test_users = [
('test_admin', 'test_admin@example.com', 'ADMIN', 'ACTIVE'),
('test_user', 'test_user@example.com', 'USER', 'ACTIVE'),
('test_manager', 'test_manager@example.com', 'MANAGER', 'ACTIVE'),
('test_guest', 'test_guest@example.com', 'GUEST', 'ACTIVE'),
]
for username, email, role, status in test_users:
cursor.execute("""
INSERT INTO sys_user (username, password, email, role, status, created_at, updated_at)
VALUES (%s, '$2a$10$N.zmdr9k7uOCQb376NoUnuTJ8iAt6Z5EHsM8lE9lBOsl7iKTVKIUi', %s, %s, %s, NOW(), NOW())
ON CONFLICT (username) DO NOTHING
""", (username, email, role, status))
logger.info(f"已生成 {len(test_users)} 个测试用户")
def _generate_test_almanac_data(self, cursor, conn):
logger.info("生成测试黄历数据...")
cursor.execute("""
INSERT INTO almanac (date, year, month, day, lunar_year, lunar_month, lunar_day, ganzhi_year, ganzhi_month, ganzhi_day, created_at, updated_at)
VALUES ('2026-02-25', 2026, 2, 25, 2026, 1, 8, '丙午', '庚寅', '戊子', NOW(), NOW())
ON CONFLICT (date) DO NOTHING
""")
logger.info("已生成测试黄历数据")
def _generate_test_ziwei_data(self, cursor, conn):
logger.info("生成测试紫微斗数数据...")
cursor.execute("""
INSERT INTO ziwei_chart (user_id, birth_date, birth_time, gender, chart_data, created_at, updated_at)
SELECT id, '1990-01-01', '12:00:00', 'MALE', '{"palaces": []}'::jsonb, NOW(), NOW()
FROM sys_user
WHERE username LIKE 'test_%'
ON CONFLICT (user_id) DO NOTHING
""")
logger.info("已生成测试紫微斗数数据")
def clean_test_data(self):
logger.info("开始清理测试数据...")
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM fortune_history WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
deleted_history = cursor.rowcount
cursor.execute("DELETE FROM ziwei_chart WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
deleted_ziwei = cursor.rowcount
cursor.execute("DELETE FROM sys_user WHERE username LIKE 'test_%'")
deleted_users = cursor.rowcount
conn.commit()
logger.info(f"测试数据清理完成:删除 {deleted_users} 个用户,{deleted_ziwei} 个紫微斗数数据,{deleted_history} 个运势历史")
except Exception as e:
conn.rollback()
logger.error(f"清理测试数据失败: {e}")
raise
finally:
self._release_connection(conn)
def reset_test_data(self):
logger.info("开始重置测试数据...")
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute("CALL reset_test_data()")
conn.commit()
logger.info("测试数据重置完成")
except Exception as e:
conn.rollback()
logger.error(f"重置测试数据失败: {e}")
raise
finally:
self._release_connection(conn)
def get_status(self):
logger.info("获取测试数据状态...")
conn = self._get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sys_user WHERE username LIKE 'test_%'")
user_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM ziwei_chart WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
ziwei_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM fortune_history WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
history_count = cursor.fetchone()[0]
status = {
"test_users": user_count,
"ziwei_charts": ziwei_count,
"fortune_history": history_count,
"timestamp": datetime.now().isoformat()
}
logger.info(f"测试数据状态: {json.dumps(status, indent=2, ensure_ascii=False)}")
return status
except Exception as e:
logger.error(f"获取测试数据状态失败: {e}")
raise
finally:
self._release_connection(conn)
def close(self):
if self.connection_pool:
self.connection_pool.closeall()
logger.info("数据库连接池已关闭")