Redis数据结构之Stream

本文最后更新于 2025年6月2日

未完待续

1.概述

Redis Stream是Redis 5.0版本新增加的数据结构。

Redis Stream主要用于消息队列(MQ,Message Queue),Redis本身是有一个Redis发布订阅(pub/sub)来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃,而且没有ACK机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了,简单来说发布订阅(pub/sub)可以分发消息,但无法记录历史消息。

而Redis Stream提供了消息的持久化和主备复制功能,支持自动生成全局唯一ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream是一个链表,会将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容

Redis Stream组成部分说明:

  • Message Content

    消息

  • Consumer group

    消费组,通过XGROUP CREATE命令创建,同一个消费组可以有多个消费者

  • Last_delivered_id

    游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id往前移动

  • Consumer

    消费者,包含在消费组当中

  • Pending_ids

    消费者会布一个状态变量,用于记录被当前消费者已读取但未ack的消息id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2.队列(生产)相关命令

2.1 XADD

添加消息到队列末尾,如果队列不存在,则会先创建队列

  • key
  • *|id 消息的ID,格式必须是时间戳-序列号这样的方式,下一条的ID必须比上一条的要大,写成*号,系统将自动生成。ID比较大小时,先比较时间戳大小,若相同再比较序列号
  • field value 键值对
XADD key *|id field value [field value ...]

127.0.0.1:6379> XADD mystream * name lzj age 28
1748835683065-0
127.0.0.1:6379> XADD mystream * name xxn age 24
1748835967616-0

返回的1748835683065-01748835967616-0就是消息的ID,-前的数字为生成消息时的毫秒级时间戳,-后面的数字代表同一毫秒内产生的第几个序列号

在相同的毫秒下序列号从0开始递增,序列号是64位长度,理论上在同一毫秒内生成的数据量无法到达这个级别,因此不用担心序列号会不够用。毫秒数取的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID,也即redis在增加信息条目时会检査当前id与上一条目的id,自动纠正错误的情况,一定要保证后面的id比前面大,一个流中信息条目的ID必须是单调增的,这是stream的基础。

Stream的数据类型就是Stream

127.0.0.1:6379> type mystream
stream

2.2 XRANGE

获取消息列表(可以指定范围),忽略删除的消息

  • key 对应的stream
  • start 开始值,用-表示最小
  • end 结束值,用+表示最小
  • count 限制条数
XRANGE key start end [Count count]

127.0.0.1:6379> xrange mystream - + 
1748835683065-0
name
lzj
age
28
1748835967616-0
name
xxn
age
24
127.0.0.1:6379> xrange mystream - + Count  1
1748835683065-0
name
lzj
age
28

2.3 XREVRANGE

和XRANGE相比,区别在于反向获取,ID从大到小

127.0.0.1:6379> xrevrange mystream + -  
1748835967616-0
name
xxn
age
24
1748835683065-0
name
lzj
age
28
127.0.0.1:6379> xrevrange mystream + -  count 1
1748835967616-0
name
xxn
age
24

2.4 XDEL

删除消息,按照ID删除

XDEL key id [id ...]

XDEL mystream 1748835967616-0

2.5 XLEN

获取Stream中的消息长度

127.0.0.1:6379> XLEN mystream
2

2.6 XTRIM

限制Stream的长度,如果已经超长会进行截取

  • MAXLEN 允许的最大长度,对流进行修剪限制长度
  • MINID 允许的最小id,从某个id值开始比该id值小的将会被抛弃
XTRIM mystream MAXLEN|MINID n

127.0.0.1:6379> XTRIM mystream MAXLEN 2
4
127.0.0.1:6379> XTRIM mystream MINID 1748841640027-0
1

2.7 XREAD

获取消息(阻塞/非阻塞),返回大于指定ID的消息

  • COUNT 最多读取多少条
  • BLOCK 是否以阻塞的方式读取消息,默认不阻塞,如果miliseconds设置为0,表示永远阻塞
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

127.0.0.1:6379> xrange mystream - +
1748841640027-0
id
18
1748842099599-0
id
2323
1748842102200-0
id
232
1748842104391-0
id
23cecs
1748842107092-0
id
23ced32w2w
1748842109798-0
id
23ced3w2dw
1748842112394-0
id
23ced323dw2

3.消费相关命令


"如果文章对您有帮助,可以请作者喝杯咖啡吗?"

微信二维码

微信支付

支付宝二维码

支付宝


Redis数据结构之Stream
https://blog.liuzijian.com/post/redis-data-structure-stream.html
作者
Liu Zijian
发布于
2024年10月16日
更新于
2025年6月2日
许可协议