Redis数据结构之Stream

本文最后更新于 2025年9月22日

1.概述

Redis Stream是Redis 5.0版本新增加的数据结构。

Redis Stream主要用于消息队列(MQ,Message Queue),Redis本身是有一个Redis发布订阅(pub/sub)来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃,而且没有ACK机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了,简单来说发布订阅(pub/sub)可以分发消息,但无法记录历史消息。

而Redis Stream提供了消息的持久化和主备复制功能,支持自动生成全局唯一ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream是一个链表,会将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容

Redis Stream组成部分说明:

  • Message Content

    消息

  • Consumer group

    消费组,通过XGROUP CREATE命令创建,同一个消费组可以有多个消费者

  • Last_delivered_id

    游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id往前移动

  • Consumer

    消费者,包含在消费组当中

  • Pending_ids

    消费者会布一个状态变量,用于记录被当前消费者已读取但未ack的消息id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2.队列(生产)相关命令

2.1 XADD

添加消息到队列末尾,如果队列不存在,则会先创建队列

  • key
  • *|id 消息的ID,格式必须是时间戳-序列号这样的方式,下一条的ID必须比上一条的要大,写成*号,系统将自动生成。ID比较大小时,先比较时间戳大小,若相同再比较序列号
  • field value 键值对
XADD <key> <*|id> <field value> [<field value ...>]

127.0.0.1:6379> XADD mystream * name lzj age 28
"1758432825026-0"
127.0.0.1:6379> XADD mystream * name xxn age 24
"1758432832463-0"

返回的1748835683065-01748835967616-0就是消息的ID,-前的数字为生成消息时的毫秒级时间戳,-后面的数字代表同一毫秒内产生的第几个序列号

在相同的毫秒下序列号从0开始递增,序列号是64位长度,理论上在同一毫秒内生成的数据量无法到达这个级别,因此不用担心序列号会不够用。毫秒数取的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID,也即redis在增加信息条目时会检査当前id与上一条目的id,自动纠正错误的情况,一定要保证后面的id比前面大,一个流中信息条目的ID必须是单调增的,这是stream的基础。

Stream的数据类型就是Stream

127.0.0.1:6379> type mystream
stream

2.2 XRANGE

获取消息列表(可以指定范围),忽略删除的消息

  • key 对应的stream
  • start 开始值,用-表示最小
  • end 结束值,用+表示最小
  • count 限制条数
XRANGE <key> <start> <end> [Count <count>]

127.0.0.1:6379> xrange mystream - + 
1) 1) "1758432825026-0"
   2) 1) "name"
      2) "lzj"
      3) "age"
      4) "28"
2) 1) "1758432832463-0"
   2) 1) "name"
      2) "xxn"
      3) "age"
      4) "24"

2.3 XREVRANGE

和XRANGE相比,区别在于反向获取,ID从大到小

127.0.0.1:6379> xrevrange mystream + - 
1) 1) "1758432832463-0"
   2) 1) "name"
      2) "xxn"
      3) "age"
      4) "24"
2) 1) "1758432825026-0"
   2) 1) "name"
      2) "lzj"
      3) "age"
      4) "28"

2.4 XDEL

删除消息,按照ID删除

XDEL <key> <id> [<id ...>]

XDEL mystream 1748835967616-0

2.5 XLEN

获取Stream中的消息长度

127.0.0.1:6379> XLEN mystream
2

2.6 XTRIM

限制Stream的长度,如果已经超长会进行截取

  • MAXLEN 允许的最大长度,对流进行修剪限制长度
  • MINID 允许的最小id,从某个id值开始比该id值小的将会被抛弃
XTRIM <stream> MAXLEN|MINID <n>

127.0.0.1:6379> XTRIM mystream MAXLEN 2
4
127.0.0.1:6379> XTRIM mystream MINID 1748841640027-0
1

2.7 XREAD

获取消息(阻塞/非阻塞),返回大于指定ID的消息

  • COUNT 最多读取多少条
  • BLOCK 是否以阻塞的方式读取消息,默认不阻塞,如果miliseconds设置为0,表示永远阻塞
  • STREAMS 队列
  • ID 指定的ID,用$代表队列内现存最大的ID的后一个ID,用0-0(或0; 00)代表队列中最小的ID
XREAD [COUNT <count>] [BLOCK <milliseconds>] STREAMS <key> [<key ...>] ID [<id ...>]

例:用$代表队列内现存最大的ID的后一个ID,因为该ID并不存在所以会返回空

127.0.0.1:6379> Xrange mystream - +
1) 1) "1758432825026-0"
   2) 1) "name"
      2) "lzj"
      3) "age"
      4) "28"
2) 1) "1758432832463-0"
   2) 1) "name"
      2) "xxn"
      3) "age"
      4) "24"
127.0.0.1:6379> XREAD  STREAMS mystream  $
(nil)

例:用0-0代表队列中最小的ID,当指定为0-0的同时不指定count时,会返回队列中所有的元素

127.0.0.1:6379> Xrange mystream - +
1) 1) "1758432825026-0"
   2) 1) "name"
      2) "lzj"
      3) "age"
      4) "28"
2) 1) "1758432832463-0"
   2) 1) "name"
      2) "xxn"
      3) "age"
      4) "24"
127.0.0.1:6379> XREAD  STREAMS mystream  0-0
1) 1) "mystream"
   2) 1) 1) "1758432825026-0"
         2) 1) "name"
            2) "lzj"
            3) "age"
            4) "28"
      2) 1) "1758432832463-0"
         2) 1) "name"
            2) "xxn"
            3) "age"
            4) "24"

例:阻塞:监听mystream中比最新的一条还靠后的一条,读取不到就会阻塞,一直监听

127.0.0.1:6379> XREAD COUNT 1 BLOCK 0  STREAMS mystream $

打开另一个命令窗口,生产消息

127.0.0.1:6379> xadd mystream * k1 v1
"1758434052255-0"

XREAD停止阻塞,打印出新生产的一条消息和等待时间

127.0.0.1:6379> XREAD COUNT 1 BLOCK 0  STREAMS mystream $
1) 1) "mystream"
   2) 1) 1) "1758434052255-0"
         2) 1) "k1"
            2) "v1"
(183.38s)

3.消费(组)相关命令

3.1 XGROUP CREATE

创建消费组

  • stresm 队列
  • group 消费组
  • id 创建消费组时必须指定ID,0代表从头消费,$代表从队尾消费(只消费最新消息)
XGROUP CREATE <stresm> <group> <id>

例:

127.0.0.1:6379> xgroup create mystream group1 $
OK
127.0.0.1:6379> xgroup create mystream group2 0
OK

3.2 XREADGROUP GROUP

允许多个消费者作为一个组来合作消费同一个stream中的消息,同一个stream中的消息一旦被消费组里面的一个消费者读取了,同组的其他消费者就无法再次读取

  • group 消费组
  • consumer 消费者
  • stream 队列
  • id 指定从哪个ID开始读取,特殊写法:>代表获取组内未分发给其他消费者的新消息(未分发必然未ACK),0代表获取已分发但未被消费者ACK的消息
  • count 读取的数量,可以用于每个消费者读取一部分消息,实现消费的负载均衡
XREADGROUP GROUP <group> <consumer> [COUNT <count>] STREAMS <stream, ...> <id>

例:同组消费者不能重复消费消息

先建两个消费组

127.0.0.1:6379> xgroup create mystream groupA 0
OK
127.0.0.1:6379> xgroup create mystream groupB 0
OK

groupA里面的新建消费者consumer1进行消费

127.0.0.1:6379> XREADGROUP GROUP groupA consumer1 STREAMS mystream >

1) 1) "mystream"
   2) 1) 1) "1758432825026-0"
         2) 1) "name"
            2) "lzj"
            3) "age"
            4) "28"
      2) 1) "1758432832463-0"
         2) 1) "name"
            2) "xxn"
            3) "age"
            4) "24"
      3) 1) "1758434052255-0"
         2) 1) "k1"
            2) "v1"

groupA中再去新建消费者consumer2进行消费,无法再消费消息

127.0.0.1:6379> XREADGROUP GROUP groupA consumer2 STREAMS mystream >
(nil)

但是,groupB中新建一个消费者取消费,可以读取到消息

127.0.0.1:6379> XREADGROUP GROUP groupB consumer1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1758432825026-0"
         2) 1) "name"
            2) "lzj"
            3) "age"
            4) "28"
      2) 1) "1758432832463-0"
         2) 1) "name"
            2) "xxn"
            3) "age"
            4) "24"
      3) 1) "1758434052255-0"
         2) 1) "k1"
            2) "v1"

例:负载均衡的消费,新建消费组groupC,三个消费者每个消费一条消息

127.0.0.1:6379> xgroup create mystream groupC 0
OK
127.0.0.1:6379> XREADGROUP GROUP groupC consumer1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1758432825026-0"
         2) 1) "name"
            2) "lzj"
            3) "age"
            4) "28"
127.0.0.1:6379> XREADGROUP GROUP groupC consumer2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1758432832463-0"
         2) 1) "name"
            2) "xxn"
            3) "age"
            4) "24"
127.0.0.1:6379> XREADGROUP GROUP groupC consumer3 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1758434052255-0"
         2) 1) "k1"
            2) "v1"

3.3 消息的ACK机制

基于Stream的消息,怎样保证消费者发生故障或宕机以后,仍然能读取未处理完的消息?Stream采用的是一个内部队列pending_list,记录消费组中消费者的读取记录,直到消费者使用xack命令来通知stream消息已经处理完成,这种消费确认机制增强了消息的可靠性。

刚刚的消费操作仅仅是对消息的读取,实际上并没有ACK“签收”

例:通过命令XPENDING查询groupC中已读取,但未确认的消息

127.0.0.1:6379> XPENDING mystream groupC
1) (integer) 3 #总数
2) "1758432825026-0" #起始ID
3) "1758434052255-0" #结束ID
4) 1) 1) "consumer1" #每个消费组消费的消息数量
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

例:通过命令XPENDING查询groupB中consumer1已读取,但未确认的消息,从小到大查询10个

127.0.0.1:6379> XPENDING mystream groupB - + 10  consumer1
1) 1) "1758432825026-0"
   2) "consumer1"
   3) (integer) 2848434
   4) (integer) 1
2) 1) "1758432832463-0"
   2) "consumer1"
   3) (integer) 2848434
   4) (integer) 1
3) 1) "1758434052255-0"
   2) "consumer1"
   3) (integer) 2848434
   4) (integer) 1

例:使用XACK向消息队列确认groupB组的消息1758432825026-0已经处理完成

127.0.0.1:6379> XACK mystream groupB 1758432825026-0
(integer) 1
127.0.0.1:6379> XPENDING mystream groupB - + 10  consumer1
1) 1) "1758432832463-0"
   2) "consumer1"
   3) (integer) 3301753
   4) (integer) 1
2) 1) "1758434052255-0"
   2) "consumer1"
   3) (integer) 3301753
   4) (integer) 1

4.XINFO

XINFO命令用于打印出一些stream结构相关的信息

例:XINFO stream打印出mystream队列的详细信息

127.0.0.1:6379> XINFO stream  mystream
 1) "length"
 2) (integer) 3
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1758434052255-0"
 9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 3
13) "recorded-first-entry-id"
14) "1758432825026-0"
15) "groups"
16) (integer) 5
17) "first-entry"
18) 1) "1758432825026-0"
    2) 1) "name"
       2) "lzj"
       3) "age"
       4) "28"
19) "last-entry"
20) 1) "1758434052255-0"
    2) 1) "k1"
       2) "v1"

例:XINFO groups打印出mystream队列队列上存在的消费组信息

127.0.0.1:6379> XINFO  groups mystream
1)  1) "name"
    2) "group1"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1758434052255-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "group2"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "0-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 3
3)  1) "name"
    2) "groupA"
    3) "consumers"
    4) (integer) 2
    5) "pending"
    6) (integer) 3
    7) "last-delivered-id"
    8) "1758434052255-0"
    9) "entries-read"
   10) (integer) 3
   11) "lag"
   12) (integer) 0
4)  1) "name"
    2) "groupB"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1758434052255-0"
    9) "entries-read"
   10) (integer) 3
   11) "lag"
   12) (integer) 0
5)  1) "name"
    2) "groupC"
    3) "consumers"
    4) (integer) 3
    5) "pending"
    6) (integer) 3
    7) "last-delivered-id"
    8) "1758434052255-0"
    9) "entries-read"
   10) (integer) 3
   11) "lag"
   12) (integer) 0

"如果文章对您有帮助,可以请作者喝杯咖啡吗?"

微信二维码

微信支付

支付宝二维码

支付宝


Redis数据结构之Stream
https://blog.liuzijian.com/post/redis-data-structure-stream.html
作者
Liu Zijian
发布于
2024年10月16日
更新于
2025年9月22日
许可协议