golang实现redis消息队列,基于stream
概述
经过前文的介绍,对于redis实现消息队列的三种方式(list、pub-sub、stream),读者想必已经理解的很深刻了,未读过的可以再看看或者回顾一下https://www.hengyumo.cn/momoblog/detail/202204151011878。
这篇文章通过golang来基于stream实现一个消息队列。
基础结构
基本的架构如图所示:
对于每个消费者组,都有两个消费者实现。一个是负责监听读取stream最新消息,推送给msg通道。一个是定时运行,读取xpending的数据,利用xclaim筛选出超过业务限制时间的数据(死信),转移其所有权,并推送给msg通道重新消费。
针对一个stream可以有多个生产者和消费者组。
针对msg chan获取到的数据可以再启动协程消费其中的数据。
当数据消费完成时,要求要发送ack报文给stream,告知消费完成。
针对已经消费完成的数据需要有移除策略,单消费组的实现,可以直接在ack之后删除。多消费者的实现则要兼顾各个消费者当前读取的位置,取其中last_deliverd_id最小的作为队头ID;但是redis并未提供删除某个ID之前的数据这样的命令,还需要借助xrange取出数据再进行移除。
代码实现
生产者
// RedisStreamPublisher 发布生产者
type RedisStreamPublisher struct {
pool *godis.Pool
logger *log.Logger
// stream名称
streamName string
}
// NewRedisStreamPublisher 创建一个新的stream发布生产者
func NewRedisStreamPublisher(pool *godis.Pool, logger *log.Logger, streamName string) *RedisStreamPublisher {
return &RedisStreamPublisher{
pool: pool,
logger: logger,
streamName: streamName,
}
}
// SendMsg 发送消息
func (publisher *RedisStreamPublisher) SendMsg(msgMap map[string]string) (interface{}, error) {
redis, err := publisher.pool.GetResource()
if err != nil {
return nil, err
}
defer utils.CloseRedis(redis, publisher.logger)
args := []string{publisher.streamName, "*"}
for key, val := range msgMap {
args = append(args, key, val)
}
return utils.RunRedisCmd(redis, publisher.logger, "xadd", args...)
}
消费者
// RedisStreamConsumer 消费者
type RedisStreamConsumer struct {
pool *godis.Pool
logger *log.Logger
ctx context.Context
// stream名称
streamName string
// 消费者组名称
consumerGroupName string
// 消费开始时游标的位置,0-0指定开头,传入ID指定特定ID,传入$指定结尾,当消费者组已经存在时不会进行新建,但是仍然会移动游标
startCursor string
// 消费通道缓冲区大小
bufferSize int
// 单次读取消息数量
ReadSize int
// 读取stream时阻塞时间,单位毫秒
ReadBlockTime int
// 数据修复协程运行间隔,单位秒
dataRecoverInterval int
// 单次读取pending消息数量
PendingReadSize int
// 未ack消息时间超出多久时视为死信进行重新消费,单位毫秒
DeadMsgTime int
// 用于等待所有内部协程终止
wg sync.WaitGroup
// 保存msgChan
msgChan chan []*StreamMsg
}
// RedisStreamConsumerConfig 消费者的配置信息
type RedisStreamConsumerConfig struct {
Pool *godis.Pool
Logger *log.Logger
Ctx context.Context
// stream名称
StreamName string
// 消费者组名称
ConsumerGroupName string
// 消费开始时游标的位置,0-0指定开头,传入ID指定特定ID,传入$指定结尾,当消费者组已经存在时不会进行新建,但是仍然会移动游标
StartCursor string
// 消费通道缓冲区大小
BufferSize int
// 单次读取消息数量
ReadSize int
// 读取stream时阻塞时间,单位毫秒
ReadBlockTime int
// 数据修复协程运行间隔,单位秒
DataRecoverInterval int
// 单次读取pending消息数量
PendingReadSize int
// 未ack消息时间超出多久时视为死信进行重新消费,单位毫秒
DeadMsgTime int
}
// Wait 阻塞等待内部协程完成
func (consumer *RedisStreamConsumer) Wait() {
consumer.wg.Wait()
close(consumer.msgChan)
}
// GetMsgChan 读取消息
func (consumer *RedisStreamConsumer) GetMsgChan() (<-chan []*StreamMsg, error) {
if consumer.msgChan != nil {
return consumer.msgChan, nil
}
consumer.msgChan = make(chan []*StreamMsg, consumer.bufferSize)
redis, err := consumer.pool.GetResource()
if err != nil {
return nil, err
}
defer utils.CloseRedis(redis, consumer.logger)
// 创建消费者组
_, err = utils.RunRedisCmd(redis, consumer.logger, "xgroup", "create",
consumer.streamName, consumer.consumerGroupName, consumer.startCursor)
// 隐藏已经存在消费者的错误,继续运行
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return nil, err
}
// 移动游标
_, err = utils.RunRedisCmd(redis, consumer.logger, "xgroup", "setid",
consumer.streamName, consumer.consumerGroupName, consumer.startCursor)
if err != nil {
return nil, err
}
consumer.wg.Add(1)
// stream 监听协程
go func() {
err := consumer.watchStream(context.WithValue(consumer.ctx, "name", "streamWatch"))
if err != nil {
fmt.Println("stream监听协程报错退出")
} else {
fmt.Println("stream监听协程退出")
}
consumer.wg.Done()
}()
consumer.wg.Add(1)
go func() {
err := consumer.dataRecover(context.WithValue(consumer.ctx, "name", "dataRecover"))
if err != nil {
fmt.Println("数据修复协程报错退出")
} else {
fmt.Println("数据修复协程退出")
}
consumer.wg.Done()
}()
return consumer.msgChan, nil
}
stram监听协程
// 监听stream,读取消息
func (consumer *RedisStreamConsumer) watchStream(ctx context.Context) error {
redis, err := consumer.pool.GetResource()
if err != nil {
consumer.logger.Println(err)
return err
}
defer utils.CloseRedis(redis, consumer.logger)
exist := false
for !exist {
// 读取2个,阻塞2秒
r, err := utils.RunRedisCmd(redis, consumer.logger, "xreadgroup",
"group", consumer.consumerGroupName, "readConsumer",
"count", strconv.Itoa(consumer.ReadSize), "block", strconv.Itoa(consumer.ReadBlockTime),
"streams", consumer.streamName,
">")
if err != nil {
if strings.Contains(err.Error(), "i/o timeout") {
// 此时发生异常,如果是超时超过创建时配置的超时,那么连接会关闭,需要重新申请连接
consumer.logger.Println(err)
utils.CloseRedis(redis, consumer.logger)
redis, err = consumer.pool.GetResource()
if err != nil {
consumer.logger.Println(err)
return err
}
} else {
consumer.logger.Println(err)
//return err
}
} else {
result, err := GetXgroupreadResult(r)
if err != nil {
consumer.logger.Println(err)
} else {
// 推送给消费协程进行消费
for _, xgroupreadResult := range result {
// 写入channel
consumer.msgChan <- xgroupreadResult.Msgs
}
}
}
// 监听关闭
select {
case <-ctx.Done():
exist = true
break
default:
break
}
}
return nil
}
数据修复协程
// 数据修复,死信消费
func (consumer *RedisStreamConsumer) dataRecover(ctx context.Context) error {
redis, err := consumer.pool.GetResource()
if err != nil {
consumer.logger.Println(err)
return err
}
defer utils.CloseRedis(redis, consumer.logger)
// 每隔一段读取未进行ack的数据,将等待时间超时的数据重新推送到消费者队列中
tick := time.NewTicker(time.Duration(consumer.dataRecoverInterval) * time.Second)
exits := false
for !exits {
if now, ok := <-tick.C; ok {
consumer.logger.Println(now)
pendingRsl, _ := utils.RunRedisCmd(redis, consumer.logger, "xpending",
consumer.streamName, consumer.consumerGroupName, "-", "+", strconv.Itoa(consumer.PendingReadSize))
result, _ := GetXpendingResult(pendingRsl)
if len(result) != 0 {
// 针对这些ID进行所有权转移,如果超时的话
ids := make([]string, len(result))
for j, xpendingResult := range result {
ids[j] = xpendingResult.Id
}
// 大于超时时间的数据筛出
args := []string{consumer.streamName, consumer.consumerGroupName,
"recoverConsumer", strconv.Itoa(consumer.DeadMsgTime)}
for _, id := range ids {
args = append(args, id)
}
xclaimRsl, _ := utils.RunRedisCmd(redis, consumer.logger, "xclaim", args...)
msgs, _ := GetStreamMsgs(xclaimRsl)
if len(msgs) != 0 {
// 推送给消费协程进行消费
consumer.msgChan <- msgs
}
}
}
// 监听关闭
select {
case <-ctx.Done():
tick.Stop()
exits = true
break
default:
break
}
}
return nil
}
发送Ack
// AckMsg ack消息
func (consumer *RedisStreamConsumer) AckMsg(msgs []*StreamMsg) error {
redis, _ := consumer.pool.GetResource()
defer utils.CloseRedis(redis, consumer.logger)
// 发送xack
args := []string{consumer.streamName, consumer.consumerGroupName}
for _, msg := range msgs {
args = append(args, msg.Id)
}
_, err := utils.RunRedisCmd(redis, consumer.logger, "xack", args...)
if err != nil {
return err
}
return nil
}
测试代码
通过多个协程模拟了消息推送的流程,测试消息不会被阻塞,消息应答正常。
package test
import (
"context"
"fmt"
"github.com/piaohao/godis"
"log"
"momo/mq"
"os"
"strconv"
"testing"
"time"
)
var pool = godis.NewPool(&godis.PoolConfig{}, &godis.Option{
Host: "localhost",
Port: 16379,
Db: 0,
ConnectionTimeout: 10 * time.Second,
SoTimeout: 10 * time.Second,
})
var logger = log.New(os.Stdout, "redis log ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
func Test_simple_mq(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
publisher := mq.NewRedisStreamPublisher(pool, logger, "trmq")
_, err := publisher.SendMsg(map[string]string{"msg": "hello"})
if err != nil {
t.Error(err)
}
consumer := mq.NewRedisStreamConsumer(&mq.RedisStreamConsumerConfig{
Pool: pool,
Logger: logger,
Ctx: ctx,
StreamName: "trmq",
ConsumerGroupName: "trmqcg",
StartCursor: "0-0",
BufferSize: 50,
ReadSize: 50,
ReadBlockTime: 30 * 1000,
DataRecoverInterval: 10,
PendingReadSize: 50,
DeadMsgTime: 30 * 1000,
})
msgChan, err := consumer.GetMsgChan()
if err != nil {
t.Error(err)
}
go func() {
for i := 0; i < 30; i++ {
_, err := publisher.SendMsg(map[string]string{"msg": "hello" + strconv.Itoa(i)})
if err != nil {
t.Error(err)
}
time.Sleep(1 * time.Second)
}
cancel()
consumer.Wait()
}()
for msgs := range msgChan {
msgStr := ""
msgStr += fmt.Sprint("收到消息:[\n")
for _, msg := range msgs {
msgStr += fmt.Sprintf("%v,\n", msg)
}
msgStr += fmt.Sprint("]\n")
fmt.Println(msgStr)
err := consumer.AckAndDelMsg(msgs)
if err != nil {
t.Error(err)
}
}
fmt.Println("end ...")
}
版权声明
本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途
发布时间:2022年05月02日 09:45:39
备案号:
闽ICP备19015193号-1
关闭特效
评论区#
还没有评论哦,期待您的评论!
引用发言