我来介绍几种实现 Queue 持久化的方法:
使用 pickle 序列化存储(简单但不推荐用于生产环境):import queue
import pickle
import atexit
class PersistentQueue:
def __init__(self, filename="queue.pkl"):
self.filename = filename
# 尝试加载已存在的队列数据
try:
with open(self.filename, 'rb') as f:
self.queue = pickle.load(f)
except FileNotFoundError:
self.queue = queue.Queue()
# 注册程序退出时的保存操作
atexit.register(self.save)
def put(self, item):
self.queue.put(item)
self.save() # 每次放入数据后保存
def get(self):
item = self.queue.get()
self.save() # 每次获取数据后保存
return item
def save(self):
with open(self.filename, 'wb') as f:
pickle.dump(self.queue, f) 使用 SQLite 存储(推荐用于小型应用):import sqlite3
import queue
import json
class SQLiteQueue:
def __init__(self, db_path="queue.db"):
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS queue_items
(id INTEGER PRIMARY KEY AUTOINCREMENT,
item TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)
''')
self.conn.commit()
def put(self, item):
# 将数据序列化为JSON存储
self.cursor.execute('INSERT INTO queue_items (item) VALUES (?)',
(json.dumps(item),))
self.conn.commit()
def get(self):
self.cursor.execute('SELECT id, item FROM queue_items ORDER BY id LIMIT 1')
result = self.cursor.fetchone()
if result:
item_id, item_data = result
# 删除已获取的数据
self.cursor.execute('DELETE FROM queue_items WHERE id = ?', (item_id,))
self.conn.commit()
return json.loads(item_data)
raise queue.Empty()
def close(self):
self.conn.close() 使用 Redis 实现(推荐用于分布式系统):import redis
import json
class RedisQueue:
def __init__(self, name='my_queue', namespace='queue', **redis_kwargs):
self.key = f"{namespace}:{name}"
self.rdb = redis.Redis(**redis_kwargs)
def put(self, item):
self.rdb.rpush(self.key, json.dumps(item))
def get(self, block=True, timeout=None):
if block:
item = self.rdb.blpop(self.key, timeout=timeout)
if item:
item = json.loads(item[1])
return item
else:
item = self.rdb.lpop(self.key)
if item:
return json.loads(item)
return None 使用建议:
对于简单的单机应用,可以使用 SQLite 方案,它提供了可靠的持久化存储,且不需要额外的服务依赖。
对于分布式系统或需要高性能的场景,推荐使用 Redis 方案,它提供了更好的性能和可扩展性。
使用示例:
# SQLite方案示例
queue = SQLiteQueue()
queue.put({"message": "test"})
item = queue.get()
print(item) # 输出: {'message': 'test'}
# Redis方案示例
queue = RedisQueue(host='localhost', port=6379, db=0)
queue.put({"message": "test"})
item = queue.get()
print(item) # 输出: {'message': 'test'} 注意事项:
pickle 方案虽然简单,但有安全风险,不建议在生产环境使用SQLite 方案适合单机小型应用,但并发性能有限Redis 方案最灵活,但需要额外维护 Redis 服务根据实际需求选择合适的方案,考虑因素包括:数据量、并发需求、是否分布式等网友回复


