我来介绍几种实现 Queue 持久化的方法:
使用 pickle 序列化存储(简单但不推荐用于生产环境):import queue import pickle import atexit class PersistentQueue: def __init__(self, filename="queue.pkl"): self.filename = filename # 尝试加载已存在的队列数据 try: with open(self.filename, 'rb') as f: self.queue = pickle.load(f) except FileNotFoundError: self.queue = queue.Queue() # 注册程序退出时的保存操作 atexit.register(self.save) def put(self, item): self.queue.put(item) self.save() # 每次放入数据后保存 def get(self): item = self.queue.get() self.save() # 每次获取数据后保存 return item def save(self): with open(self.filename, 'wb') as f: pickle.dump(self.queue, f)使用 SQLite 存储(推荐用于小型应用):
import sqlite3 import queue import json class SQLiteQueue: def __init__(self, db_path="queue.db"): self.conn = sqlite3.connect(db_path) self.cursor = self.conn.cursor() self.cursor.execute(''' CREATE TABLE IF NOT EXISTS queue_items (id INTEGER PRIMARY KEY AUTOINCREMENT, item TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP) ''') self.conn.commit() def put(self, item): # 将数据序列化为JSON存储 self.cursor.execute('INSERT INTO queue_items (item) VALUES (?)', (json.dumps(item),)) self.conn.commit() def get(self): self.cursor.execute('SELECT id, item FROM queue_items ORDER BY id LIMIT 1') result = self.cursor.fetchone() if result: item_id, item_data = result # 删除已获取的数据 self.cursor.execute('DELETE FROM queue_items WHERE id = ?', (item_id,)) self.conn.commit() return json.loads(item_data) raise queue.Empty() def close(self): self.conn.close()使用 Redis 实现(推荐用于分布式系统):
import redis import json class RedisQueue: def __init__(self, name='my_queue', namespace='queue', **redis_kwargs): self.key = f"{namespace}:{name}" self.rdb = redis.Redis(**redis_kwargs) def put(self, item): self.rdb.rpush(self.key, json.dumps(item)) def get(self, block=True, timeout=None): if block: item = self.rdb.blpop(self.key, timeout=timeout) if item: item = json.loads(item[1]) return item else: item = self.rdb.lpop(self.key) if item: return json.loads(item) return None
使用建议:
对于简单的单机应用,可以使用 SQLite 方案,它提供了可靠的持久化存储,且不需要额外的服务依赖。
对于分布式系统或需要高性能的场景,推荐使用 Redis 方案,它提供了更好的性能和可扩展性。
使用示例:
# SQLite方案示例 queue = SQLiteQueue() queue.put({"message": "test"}) item = queue.get() print(item) # 输出: {'message': 'test'} # Redis方案示例 queue = RedisQueue(host='localhost', port=6379, db=0) queue.put({"message": "test"}) item = queue.get() print(item) # 输出: {'message': 'test'}
注意事项:
pickle 方案虽然简单,但有安全风险,不建议在生产环境使用SQLite 方案适合单机小型应用,但并发性能有限Redis 方案最灵活,但需要额外维护 Redis 服务根据实际需求选择合适的方案,考虑因素包括:数据量、并发需求、是否分布式等网友回复