08ea5fbe98
添加用户管理视图、API和状态管理文件
183 lines
7.0 KiB
Python
183 lines
7.0 KiB
Python
"""
|
|
测试数据管理器
|
|
"""
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
import json
|
|
import logging
|
|
|
|
from .config import TestDataManagerConfig
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TestDataManager:
|
|
def __init__(self, config: TestDataManagerConfig):
|
|
self.config = config
|
|
self.connection_pool = None
|
|
self._init_connection_pool()
|
|
|
|
def _init_connection_pool(self):
|
|
try:
|
|
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
host=self.config.db_host,
|
|
port=self.config.db_port,
|
|
database=self.config.db_name,
|
|
user=self.config.db_user,
|
|
password=self.config.db_password
|
|
)
|
|
logger.info("数据库连接池初始化成功")
|
|
except Exception as e:
|
|
logger.error(f"数据库连接池初始化失败: {e}")
|
|
raise
|
|
|
|
def _get_connection(self):
|
|
return self.connection_pool.getconn()
|
|
|
|
def _release_connection(self, conn):
|
|
self.connection_pool.putconn(conn)
|
|
|
|
def generate_test_data(self):
|
|
logger.info("开始生成测试数据...")
|
|
conn = self._get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM sys_user WHERE username LIKE 'test_%'")
|
|
user_count = cursor.fetchone()[0]
|
|
|
|
if user_count > 0:
|
|
logger.info(f"测试数据已存在,跳过生成(现有 {user_count} 个测试用户)")
|
|
return
|
|
|
|
self._generate_test_users(cursor, conn)
|
|
self._generate_test_almanac_data(cursor, conn)
|
|
self._generate_test_ziwei_data(cursor, conn)
|
|
|
|
conn.commit()
|
|
logger.info("测试数据生成完成")
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"生成测试数据失败: {e}")
|
|
raise
|
|
finally:
|
|
self._release_connection(conn)
|
|
|
|
def _generate_test_users(self, cursor, conn):
|
|
logger.info("生成测试用户...")
|
|
test_users = [
|
|
('test_admin', 'test_admin@example.com', 'ADMIN', 'ACTIVE'),
|
|
('test_user', 'test_user@example.com', 'USER', 'ACTIVE'),
|
|
('test_manager', 'test_manager@example.com', 'MANAGER', 'ACTIVE'),
|
|
('test_guest', 'test_guest@example.com', 'GUEST', 'ACTIVE'),
|
|
]
|
|
|
|
for username, email, role, status in test_users:
|
|
cursor.execute("""
|
|
INSERT INTO sys_user (username, password, email, role, status, created_at, updated_at)
|
|
VALUES (%s, '$2a$10$N.zmdr9k7uOCQb376NoUnuTJ8iAt6Z5EHsM8lE9lBOsl7iKTVKIUi', %s, %s, %s, NOW(), NOW())
|
|
ON CONFLICT (username) DO NOTHING
|
|
""", (username, email, role, status))
|
|
|
|
logger.info(f"已生成 {len(test_users)} 个测试用户")
|
|
|
|
def _generate_test_almanac_data(self, cursor, conn):
|
|
logger.info("生成测试黄历数据...")
|
|
cursor.execute("""
|
|
INSERT INTO almanac (date, year, month, day, lunar_year, lunar_month, lunar_day, ganzhi_year, ganzhi_month, ganzhi_day, created_at, updated_at)
|
|
VALUES ('2026-02-25', 2026, 2, 25, 2026, 1, 8, '丙午', '庚寅', '戊子', NOW(), NOW())
|
|
ON CONFLICT (date) DO NOTHING
|
|
""")
|
|
logger.info("已生成测试黄历数据")
|
|
|
|
def _generate_test_ziwei_data(self, cursor, conn):
|
|
logger.info("生成测试紫微斗数数据...")
|
|
cursor.execute("""
|
|
INSERT INTO ziwei_chart (user_id, birth_date, birth_time, gender, chart_data, created_at, updated_at)
|
|
SELECT id, '1990-01-01', '12:00:00', 'MALE', '{"palaces": []}'::jsonb, NOW(), NOW()
|
|
FROM sys_user
|
|
WHERE username LIKE 'test_%'
|
|
ON CONFLICT (user_id) DO NOTHING
|
|
""")
|
|
logger.info("已生成测试紫微斗数数据")
|
|
|
|
def clean_test_data(self):
|
|
logger.info("开始清理测试数据...")
|
|
conn = self._get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM fortune_history WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
|
|
deleted_history = cursor.rowcount
|
|
|
|
cursor.execute("DELETE FROM ziwei_chart WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
|
|
deleted_ziwei = cursor.rowcount
|
|
|
|
cursor.execute("DELETE FROM sys_user WHERE username LIKE 'test_%'")
|
|
deleted_users = cursor.rowcount
|
|
|
|
conn.commit()
|
|
logger.info(f"测试数据清理完成:删除 {deleted_users} 个用户,{deleted_ziwei} 个紫微斗数数据,{deleted_history} 个运势历史")
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"清理测试数据失败: {e}")
|
|
raise
|
|
finally:
|
|
self._release_connection(conn)
|
|
|
|
def reset_test_data(self):
|
|
logger.info("开始重置测试数据...")
|
|
conn = self._get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("CALL reset_test_data()")
|
|
conn.commit()
|
|
logger.info("测试数据重置完成")
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"重置测试数据失败: {e}")
|
|
raise
|
|
finally:
|
|
self._release_connection(conn)
|
|
|
|
def get_status(self):
|
|
logger.info("获取测试数据状态...")
|
|
conn = self._get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM sys_user WHERE username LIKE 'test_%'")
|
|
user_count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM ziwei_chart WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
|
|
ziwei_count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM fortune_history WHERE user_id IN (SELECT id FROM sys_user WHERE username LIKE 'test_%')")
|
|
history_count = cursor.fetchone()[0]
|
|
|
|
status = {
|
|
"test_users": user_count,
|
|
"ziwei_charts": ziwei_count,
|
|
"fortune_history": history_count,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
logger.info(f"测试数据状态: {json.dumps(status, indent=2, ensure_ascii=False)}")
|
|
return status
|
|
except Exception as e:
|
|
logger.error(f"获取测试数据状态失败: {e}")
|
|
raise
|
|
finally:
|
|
self._release_connection(conn)
|
|
|
|
def close(self):
|
|
if self.connection_pool:
|
|
self.connection_pool.closeall()
|
|
logger.info("数据库连接池已关闭")
|