08ea5fbe98
添加用户管理视图、API和状态管理文件
231 lines
5.9 KiB
Python
231 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Caffeine缓存管理模块演示脚本
|
|
|
|
展示Caffeine缓存的核心功能。
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
from core.caffeine_cache import CaffeineCache, cache_manager
|
|
|
|
|
|
def demo_basic_operations():
|
|
"""演示基本操作"""
|
|
print("\n" + "="*60)
|
|
print("演示1: 缓存基本操作")
|
|
print("="*60)
|
|
|
|
cache = CaffeineCache()
|
|
|
|
# Put操作
|
|
cache.put("user:1", {"name": "张三", "age": 25})
|
|
cache.put("user:2", {"name": "李四", "age": 30})
|
|
print("✅ 添加2个缓存项")
|
|
|
|
# Get操作
|
|
user1 = cache.get("user:1")
|
|
print(f"✅ 获取user:1 = {user1}")
|
|
|
|
# Exists操作
|
|
exists = cache.exists("user:1")
|
|
print(f"✅ user:1存在: {exists}")
|
|
|
|
# Delete操作
|
|
cache.delete("user:1")
|
|
user1_after_delete = cache.get("user:1")
|
|
print(f"✅ 删除后user:1 = {user1_after_delete}")
|
|
|
|
|
|
def demo_expiration():
|
|
"""演示过期时间"""
|
|
print("\n" + "="*60)
|
|
print("演示2: 缓存过期时间")
|
|
print("="*60)
|
|
|
|
cache = CaffeineCache()
|
|
|
|
# 设置1秒过期时间
|
|
cache.put("temp_data", "临时数据", expire_seconds=1)
|
|
print("✅ 设置1秒过期时间的缓存")
|
|
|
|
# 立即获取
|
|
value = cache.get("temp_data")
|
|
print(f"✅ 立即获取: {value}")
|
|
|
|
# 等待1.5秒后获取
|
|
print("⏱️ 等待1.5秒...")
|
|
time.sleep(1.5)
|
|
value_after_expire = cache.get("temp_data")
|
|
print(f"✅ 过期后获取: {value_after_expire}")
|
|
|
|
|
|
def demo_capacity_limit():
|
|
"""演示容量限制"""
|
|
print("\n" + "="*60)
|
|
print("演示3: 缓存容量限制(LRU淘汰)")
|
|
print("="*60)
|
|
|
|
# 创建容量为3的缓存
|
|
cache = CaffeineCache(max_size=3)
|
|
|
|
# 添加3个缓存项
|
|
cache.put("key1", "value1")
|
|
cache.put("key2", "value2")
|
|
cache.put("key3", "value3")
|
|
print("✅ 添加3个缓存项")
|
|
|
|
# 访问key1使其最近使用
|
|
cache.get("key1")
|
|
print("✅ 访问key1使其最近使用")
|
|
|
|
# 添加第4个缓存项
|
|
cache.put("key4", "value4")
|
|
print("✅ 添加第4个缓存项key4")
|
|
|
|
# 验证key2被移除
|
|
value2 = cache.get("key2")
|
|
value1 = cache.get("key1")
|
|
print(f"✅ key2值: {value2} (应该为None,被LRU淘汰)")
|
|
print(f"✅ key1值: {value1} (应该保留,最近使用)")
|
|
|
|
|
|
def demo_statistics():
|
|
"""演示统计信息"""
|
|
print("\n" + "="*60)
|
|
print("演示4: 缓存统计信息")
|
|
print("="*60)
|
|
|
|
cache = CaffeineCache(record_stats=True)
|
|
|
|
# 执行缓存操作
|
|
cache.put("key1", "value1")
|
|
cache.get("key1") # 命中
|
|
cache.get("key1") # 命中
|
|
cache.get("key2") # 未命中
|
|
|
|
# 获取统计信息
|
|
stats = cache.get_stats()
|
|
print(f"✅ 命中次数: {stats['hit_count']}")
|
|
print(f"✅ 未命中次数: {stats['miss_count']}")
|
|
print(f"✅ 命中率: {stats['hit_rate']:.2%}")
|
|
print(f"✅ 缓存大小: {stats['size']}")
|
|
print(f"✅ 最大容量: {stats['max_size']}")
|
|
|
|
|
|
def demo_batch_operations():
|
|
"""演示批量操作"""
|
|
print("\n" + "="*60)
|
|
print("演示5: 批量操作")
|
|
print("="*60)
|
|
|
|
cache = CaffeineCache()
|
|
|
|
# 批量添加
|
|
data = {
|
|
"batch_key1": "batch_value1",
|
|
"batch_key2": "batch_value2",
|
|
"batch_key3": "batch_value3",
|
|
}
|
|
cache.put_all(data)
|
|
print(f"✅ 批量添加: {list(data.keys())}")
|
|
|
|
# 批量获取
|
|
keys = ["batch_key1", "batch_key2", "batch_key3"]
|
|
values = cache.get_all(keys)
|
|
print(f"✅ 批量获取: {values}")
|
|
|
|
# 批量删除
|
|
deleted_count = cache.delete_all(keys)
|
|
print(f"✅ 批量删除: {deleted_count}个")
|
|
|
|
# 验证删除
|
|
values_after_delete = cache.get_all(keys)
|
|
print(f"✅ 删除后: {values_after_delete}")
|
|
|
|
|
|
def demo_thread_safety():
|
|
"""演示线程安全"""
|
|
print("\n" + "="*60)
|
|
print("演示6: 线程安全")
|
|
print("="*60)
|
|
|
|
cache = CaffeineCache()
|
|
errors = []
|
|
|
|
def write_data(thread_id: int):
|
|
try:
|
|
for i in range(10):
|
|
cache.put(f"thread_{thread_id}_key_{i}", f"value_{i}")
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
|
|
# 创建5个线程并发写入
|
|
threads = []
|
|
for i in range(5):
|
|
t = threading.Thread(target=write_data, args=(i,))
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
print(f"✅ 5个线程并发写入完成")
|
|
print(f"✅ 错误数: {len(errors)}")
|
|
|
|
# 验证数据完整性
|
|
total_keys = 5 * 10
|
|
count = 0
|
|
for thread_id in range(5):
|
|
for i in range(10):
|
|
if cache.get(f"thread_{thread_id}_key_{i}") is not None:
|
|
count += 1
|
|
|
|
print(f"✅ 成功写入: {count}/{total_keys} 条")
|
|
|
|
|
|
def demo_cache_manager():
|
|
"""演示缓存管理器"""
|
|
print("\n" + "="*60)
|
|
print("演示7: 缓存管理器")
|
|
print("="*60)
|
|
|
|
# 获取命名缓存
|
|
user_cache = cache_manager.get_cache("users", max_size=100, record_stats=True)
|
|
product_cache = cache_manager.get_cache("products", max_size=50, record_stats=True)
|
|
|
|
# 添加数据
|
|
user_cache.put("user:1", {"name": "张三"})
|
|
product_cache.put("product:1", {"name": "iPhone"})
|
|
|
|
print("✅ 创建2个命名缓存: users, products")
|
|
print(f"✅ user:1 = {user_cache.get('user:1')}")
|
|
print(f"✅ product:1 = {product_cache.get('product:1')}")
|
|
|
|
# 获取所有统计信息
|
|
all_stats = cache_manager.get_all_stats()
|
|
print(f"✅ 所有缓存统计: {list(all_stats.keys())}")
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
print("\n" + "="*60)
|
|
print("Caffeine缓存管理模块演示")
|
|
print("="*60)
|
|
|
|
demo_basic_operations()
|
|
demo_expiration()
|
|
demo_capacity_limit()
|
|
demo_statistics()
|
|
demo_batch_operations()
|
|
demo_thread_safety()
|
|
demo_cache_manager()
|
|
|
|
print("\n" + "="*60)
|
|
print("✅ 所有演示完成!")
|
|
print("="*60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|