+
86
-

回答

我来介绍几种实现 Queue 持久化的方法:

使用 pickle 序列化存储(简单但不推荐用于生产环境):
import queue
import pickle
import atexit

class PersistentQueue:
    def __init__(self, filename="queue.pkl"):
        self.filename = filename
        # 尝试加载已存在的队列数据
        try:
            with open(self.filename, 'rb') as f:
                self.queue = pickle.load(f)
        except FileNotFoundError:
            self.queue = queue.Queue()

        # 注册程序退出时的保存操作
        atexit.register(self.save)

    def put(self, item):
        self.queue.put(item)
        self.save()  # 每次放入数据后保存

    def get(self):
        item = self.queue.get()
        self.save()  # 每次获取数据后保存
        return item

    def save(self):
        with open(self.filename, 'wb') as f:
            pickle.dump(self.queue, f)
使用 SQLite 存储(推荐用于小型应用):
import sqlite3
import queue
import json

class SQLiteQueue:
    def __init__(self, db_path="queue.db"):
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS queue_items
            (id INTEGER PRIMARY KEY AUTOINCREMENT,
             item TEXT,
             timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)
        ''')
        self.conn.commit()

    def put(self, item):
        # 将数据序列化为JSON存储
        self.cursor.execute('INSERT INTO queue_items (item) VALUES (?)',
                          (json.dumps(item),))
        self.conn.commit()

    def get(self):
        self.cursor.execute('SELECT id, item FROM queue_items ORDER BY id LIMIT 1')
        result = self.cursor.fetchone()
        if result:
            item_id, item_data = result
            # 删除已获取的数据
            self.cursor.execute('DELETE FROM queue_items WHERE id = ?', (item_id,))
            self.conn.commit()
            return json.loads(item_data)
        raise queue.Empty()

    def close(self):
        self.conn.close()
使用 Redis 实现(推荐用于分布式系统):
import redis
import json

class RedisQueue:
    def __init__(self, name='my_queue', namespace='queue', **redis_kwargs):
        self.key = f"{namespace}:{name}"
        self.rdb = redis.Redis(**redis_kwargs)

    def put(self, item):
        self.rdb.rpush(self.key, json.dumps(item))

    def get(self, block=True, timeout=None):
        if block:
            item = self.rdb.blpop(self.key, timeout=timeout)
            if item:
                item = json.loads(item[1])
                return item
        else:
            item = self.rdb.lpop(self.key)
            if item:
                return json.loads(item)
        return None

使用建议:

对于简单的单机应用,可以使用 SQLite 方案,它提供了可靠的持久化存储,且不需要额外的服务依赖。

对于分布式系统或需要高性能的场景,推荐使用 Redis 方案,它提供了更好的性能和可扩展性。

使用示例:

# SQLite方案示例
queue = SQLiteQueue()
queue.put({"message": "test"})
item = queue.get()
print(item)  # 输出: {'message': 'test'}

# Redis方案示例
queue = RedisQueue(host='localhost', port=6379, db=0)
queue.put({"message": "test"})
item = queue.get()
print(item)  # 输出: {'message': 'test'}

注意事项:

pickle 方案虽然简单,但有安全风险,不建议在生产环境使用SQLite 方案适合单机小型应用,但并发性能有限Redis 方案最灵活,但需要额外维护 Redis 服务根据实际需求选择合适的方案,考虑因素包括:数据量、并发需求、是否分布式等

网友回复

我知道答案,我要回答