+
95
-

redis实现消息队列有几种方法?

请问redis实现消息队列有几种方法?

网友回复

+
15
-

第一种通过lpop与lpush list队列

消息发送端

<?php

$redis = new Redis();

$redis->connect('127.0.0.1', 6379);

//$password = '123456';

//$redis->auth($password);

$arr = array('h', 'e', 'l', 'l', 'o', 'w', 'o', 'r', 'l', 'd');

foreach ($a...

点击查看剩余70%

+
15
-

还有一种sub pub消息

消息生产端

<?php

$channelName = "testPubSub";
$channelName2 = "testPubSub2";
//向指定频道发送消息
try {
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379);
    for ($i = 0; $i < 5; $i++) {
        $data = array('key' => 'key'.$i, 'data' => 'testdata');
        $ret = $redis->publish($cha...

点击查看剩余70%

+
15
-

stream流式消息队列处理方式

<?php
//连接reids
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//创建一个消息队列
// 星号表示自动生成id,后面参数key,value

$redis->rawCommand('xadd', 'mq2', '*', 'ms1g', '1');

//在消息队列上创建一个组为mqGroup3
$redis->rawCommand('xgroup', 'create', 'mq2', 'mqGroup3', '0');

//读取组为mqGroup3的stream消息,每次读取一个,消费者为consumerA
$_data = $redis->rawCommand('xreadgroup', 'group', 'mqGroup3', 'consumerA', 'count', '1', 'streams', 'mq2', '>');
//处理完后确认消息已处理,最后一个参数是消息id
$redis->rawCommand('xack', 'group', 'mqGroup3', '12132131234-00');
echo var_export($_data, true)."\n";

Stream为redis 5.0后新增的数据结构。支持多播的可持久化消息队列,实现借鉴了Kafka设计。 Redis Stream的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。 每个Stream都有唯一的名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建。 每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_de...

点击查看剩余70%

+
15
-

还可以用sort set

<?php
$key = "test";
$redis = new Redis();
//建立一个长链接
$redis->connect('127.0.0.1', 6379);

$redis->zAdd(
    $key,
    time(),
    "msgvalue"
);

...

点击查看剩余70%

我知道答案,我要回答