search.png
关于我
menu.png
golang实现redis消息队列,基于stream

概述

经过前文的介绍,对于redis实现消息队列的三种方式(list、pub-sub、stream),读者想必已经理解的很深刻了,未读过的可以再看看或者回顾一下https://www.hengyumo.cn/momoblog/detail/202204151011878。

这篇文章通过golang来基于stream实现一个消息队列。

基础结构

基本的架构如图所示:

对于每个消费者组,都有两个消费者实现。一个是负责监听读取stream最新消息,推送给msg通道。一个是定时运行,读取xpending的数据,利用xclaim筛选出超过业务限制时间的数据(死信),转移其所有权,并推送给msg通道重新消费。

针对一个stream可以有多个生产者和消费者组。

针对msg chan获取到的数据可以再启动协程消费其中的数据。

当数据消费完成时,要求要发送ack报文给stream,告知消费完成。

针对已经消费完成的数据需要有移除策略,单消费组的实现,可以直接在ack之后删除。多消费者的实现则要兼顾各个消费者当前读取的位置,取其中last_deliverd_id最小的作为队头ID;但是redis并未提供删除某个ID之前的数据这样的命令,还需要借助xrange取出数据再进行移除。

代码实现

生产者


// RedisStreamPublisher 发布生产者
type RedisStreamPublisher struct {
    pool   *godis.Pool
    logger *log.Logger
    // stream名称
    streamName string
}

// NewRedisStreamPublisher 创建一个新的stream发布生产者
func NewRedisStreamPublisher(pool *godis.Pool, logger *log.Logger, streamName string) *RedisStreamPublisher {
    return &RedisStreamPublisher{
        pool:       pool,
        logger:     logger,
        streamName: streamName,
    }
}

// SendMsg 发送消息
func (publisher *RedisStreamPublisher) SendMsg(msgMap map[string]string) (interface{}, error) {

    redis, err := publisher.pool.GetResource()
    if err != nil {
        return nil, err
    }
    defer utils.CloseRedis(redis, publisher.logger)

    args := []string{publisher.streamName, "*"}
    for key, val := range msgMap {
        args = append(args, key, val)
    }
    return utils.RunRedisCmd(redis, publisher.logger, "xadd", args...)
}

消费者

// RedisStreamConsumer 消费者
type RedisStreamConsumer struct {
    pool   *godis.Pool
    logger *log.Logger
    ctx    context.Context
    // stream名称
    streamName string
    // 消费者组名称
    consumerGroupName string
    // 消费开始时游标的位置,0-0指定开头,传入ID指定特定ID,传入$指定结尾,当消费者组已经存在时不会进行新建,但是仍然会移动游标
    startCursor string
    // 消费通道缓冲区大小
    bufferSize int
    // 单次读取消息数量
    ReadSize int
    // 读取stream时阻塞时间,单位毫秒
    ReadBlockTime int
    // 数据修复协程运行间隔,单位秒
    dataRecoverInterval int
    // 单次读取pending消息数量
    PendingReadSize int
    // 未ack消息时间超出多久时视为死信进行重新消费,单位毫秒
    DeadMsgTime int
    // 用于等待所有内部协程终止
    wg sync.WaitGroup
    // 保存msgChan
    msgChan chan []*StreamMsg
}

// RedisStreamConsumerConfig 消费者的配置信息
type RedisStreamConsumerConfig struct {
    Pool   *godis.Pool
    Logger *log.Logger
    Ctx    context.Context
    // stream名称
    StreamName string
    // 消费者组名称
    ConsumerGroupName string
    // 消费开始时游标的位置,0-0指定开头,传入ID指定特定ID,传入$指定结尾,当消费者组已经存在时不会进行新建,但是仍然会移动游标
    StartCursor string
    // 消费通道缓冲区大小
    BufferSize int
    // 单次读取消息数量
    ReadSize int
    // 读取stream时阻塞时间,单位毫秒
    ReadBlockTime int
    // 数据修复协程运行间隔,单位秒
    DataRecoverInterval int
    // 单次读取pending消息数量
    PendingReadSize int
    // 未ack消息时间超出多久时视为死信进行重新消费,单位毫秒
    DeadMsgTime int
}

// Wait 阻塞等待内部协程完成
func (consumer *RedisStreamConsumer) Wait() {
    consumer.wg.Wait()
    close(consumer.msgChan)
}

// GetMsgChan 读取消息
func (consumer *RedisStreamConsumer) GetMsgChan() (<-chan []*StreamMsg, error) {

    if consumer.msgChan != nil {
        return consumer.msgChan, nil
    }

    consumer.msgChan = make(chan []*StreamMsg, consumer.bufferSize)

    redis, err := consumer.pool.GetResource()
    if err != nil {
        return nil, err
    }
    defer utils.CloseRedis(redis, consumer.logger)
    // 创建消费者组
    _, err = utils.RunRedisCmd(redis, consumer.logger, "xgroup", "create",
        consumer.streamName, consumer.consumerGroupName, consumer.startCursor)
    // 隐藏已经存在消费者的错误,继续运行
    if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
        return nil, err
    }

    // 移动游标
    _, err = utils.RunRedisCmd(redis, consumer.logger, "xgroup", "setid",
        consumer.streamName, consumer.consumerGroupName, consumer.startCursor)
    if err != nil {
        return nil, err
    }

    consumer.wg.Add(1)
    // stream 监听协程
    go func() {
        err := consumer.watchStream(context.WithValue(consumer.ctx, "name", "streamWatch"))
        if err != nil {
            fmt.Println("stream监听协程报错退出")
        } else {
            fmt.Println("stream监听协程退出")
        }
        consumer.wg.Done()
    }()

    consumer.wg.Add(1)
    go func() {
        err := consumer.dataRecover(context.WithValue(consumer.ctx, "name", "dataRecover"))
        if err != nil {
            fmt.Println("数据修复协程报错退出")
        } else {
            fmt.Println("数据修复协程退出")
        }
        consumer.wg.Done()
    }()

    return consumer.msgChan, nil
}

stram监听协程

// 监听stream,读取消息
func (consumer *RedisStreamConsumer) watchStream(ctx context.Context) error {
    redis, err := consumer.pool.GetResource()
    if err != nil {
        consumer.logger.Println(err)
        return err
    }
    defer utils.CloseRedis(redis, consumer.logger)
    exist := false
    for !exist {
        // 读取2个,阻塞2秒
        r, err := utils.RunRedisCmd(redis, consumer.logger, "xreadgroup",
            "group", consumer.consumerGroupName, "readConsumer",
            "count", strconv.Itoa(consumer.ReadSize), "block", strconv.Itoa(consumer.ReadBlockTime),
            "streams", consumer.streamName,
            ">")
        if err != nil {
            if strings.Contains(err.Error(), "i/o timeout") {
                // 此时发生异常,如果是超时超过创建时配置的超时,那么连接会关闭,需要重新申请连接
                consumer.logger.Println(err)
                utils.CloseRedis(redis, consumer.logger)
                redis, err = consumer.pool.GetResource()
                if err != nil {
                    consumer.logger.Println(err)
                    return err
                }
            } else {
                consumer.logger.Println(err)
                //return err
            }
        } else {
            result, err := GetXgroupreadResult(r)
            if err != nil {
                consumer.logger.Println(err)
            } else {
                // 推送给消费协程进行消费
                for _, xgroupreadResult := range result {
                    // 写入channel
                    consumer.msgChan <- xgroupreadResult.Msgs
                }
            }
        }

        // 监听关闭
        select {
        case <-ctx.Done():
            exist = true
            break
        default:
            break
        }
    }
    return nil
}

数据修复协程

// 数据修复,死信消费
func (consumer *RedisStreamConsumer) dataRecover(ctx context.Context) error {
    redis, err := consumer.pool.GetResource()
    if err != nil {
        consumer.logger.Println(err)
        return err
    }
    defer utils.CloseRedis(redis, consumer.logger)

    // 每隔一段读取未进行ack的数据,将等待时间超时的数据重新推送到消费者队列中
    tick := time.NewTicker(time.Duration(consumer.dataRecoverInterval) * time.Second)
    exits := false
    for !exits {

        if now, ok := <-tick.C; ok {
            consumer.logger.Println(now)
            pendingRsl, _ := utils.RunRedisCmd(redis, consumer.logger, "xpending",
                consumer.streamName, consumer.consumerGroupName, "-", "+", strconv.Itoa(consumer.PendingReadSize))
            result, _ := GetXpendingResult(pendingRsl)
            if len(result) != 0 {
                // 针对这些ID进行所有权转移,如果超时的话
                ids := make([]string, len(result))
                for j, xpendingResult := range result {
                    ids[j] = xpendingResult.Id
                }
                // 大于超时时间的数据筛出
                args := []string{consumer.streamName, consumer.consumerGroupName,
                    "recoverConsumer", strconv.Itoa(consumer.DeadMsgTime)}
                for _, id := range ids {
                    args = append(args, id)
                }
                xclaimRsl, _ := utils.RunRedisCmd(redis, consumer.logger, "xclaim", args...)
                msgs, _ := GetStreamMsgs(xclaimRsl)
                if len(msgs) != 0 {
                    // 推送给消费协程进行消费
                    consumer.msgChan <- msgs
                }
            }
        }
        // 监听关闭
        select {
        case <-ctx.Done():
            tick.Stop()
            exits = true
            break
        default:
            break
        }
    }
    return nil
}

发送Ack

// AckMsg ack消息
func (consumer *RedisStreamConsumer) AckMsg(msgs []*StreamMsg) error {
    redis, _ := consumer.pool.GetResource()
    defer utils.CloseRedis(redis, consumer.logger)

    // 发送xack
    args := []string{consumer.streamName, consumer.consumerGroupName}
    for _, msg := range msgs {
        args = append(args, msg.Id)
    }
    _, err := utils.RunRedisCmd(redis, consumer.logger, "xack", args...)
    if err != nil {
        return err
    }

    return nil
}

测试代码

通过多个协程模拟了消息推送的流程,测试消息不会被阻塞,消息应答正常。

package test

import (
    "context"
    "fmt"
    "github.com/piaohao/godis"
    "log"
    "momo/mq"
    "os"
    "strconv"
    "testing"
    "time"
)

var pool = godis.NewPool(&godis.PoolConfig{}, &godis.Option{
    Host:              "localhost",
    Port:              16379,
    Db:                0,
    ConnectionTimeout: 10 * time.Second,
    SoTimeout:         10 * time.Second,
})

var logger = log.New(os.Stdout, "redis log ", log.Ldate|log.Lmicroseconds|log.Lshortfile)

func Test_simple_mq(t *testing.T) {

    ctx, cancel := context.WithCancel(context.Background())

    publisher := mq.NewRedisStreamPublisher(pool, logger, "trmq")
    _, err := publisher.SendMsg(map[string]string{"msg": "hello"})
    if err != nil {
        t.Error(err)
    }

    consumer := mq.NewRedisStreamConsumer(&mq.RedisStreamConsumerConfig{
        Pool:                pool,
        Logger:              logger,
        Ctx:                 ctx,
        StreamName:          "trmq",
        ConsumerGroupName:   "trmqcg",
        StartCursor:         "0-0",
        BufferSize:          50,
        ReadSize:            50,
        ReadBlockTime:       30 * 1000,
        DataRecoverInterval: 10,
        PendingReadSize:     50,
        DeadMsgTime:         30 * 1000,
    })

    msgChan, err := consumer.GetMsgChan()
    if err != nil {
        t.Error(err)
    }

    go func() {

        for i := 0; i < 30; i++ {
            _, err := publisher.SendMsg(map[string]string{"msg": "hello" + strconv.Itoa(i)})
            if err != nil {
                t.Error(err)
            }
            time.Sleep(1 * time.Second)
        }

        cancel()
        consumer.Wait()
    }()

    for msgs := range msgChan {
        msgStr := ""
        msgStr += fmt.Sprint("收到消息:[\n")
        for _, msg := range msgs {
            msgStr += fmt.Sprintf("%v,\n", msg)
        }
        msgStr += fmt.Sprint("]\n")
        fmt.Println(msgStr)

        err := consumer.AckAndDelMsg(msgs)
        if err != nil {
            t.Error(err)
        }
    }

    fmt.Println("end ...")

}

版权声明

知识共享许可协议 本文章由作者“衡于墨”创作,转载请注明出处,未经允许禁止用于商业用途

本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。
发布时间:2022年05月02日 09:45:39

评论区#

还没有评论哦,期待您的评论!

关闭特效