Redis数据结构之Stream
本文最后更新于 2025年6月2日
未完待续
1.概述
Redis Stream是Redis 5.0版本新增加的数据结构。
Redis Stream主要用于消息队列(MQ,Message Queue),Redis本身是有一个Redis发布订阅(pub/sub)来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃,而且没有ACK机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了,简单来说发布订阅(pub/sub)可以分发消息,但无法记录历史消息。
而Redis Stream提供了消息的持久化和主备复制功能,支持自动生成全局唯一ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream是一个链表,会将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容
Redis Stream组成部分说明:
Message Content
消息
Consumer group
消费组,通过
XGROUP CREATE
命令创建,同一个消费组可以有多个消费者Last_delivered_id
游标,每个消费组会有个游标
last_delivered_id
,任意一个消费者读取了消息都会使游标last_delivered_id
往前移动Consumer
消费者,包含在消费组当中
Pending_ids
消费者会布一个状态变量,用于记录被当前消费者已读取但未ack的消息id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个
pending_ids
变量在Redis官方被称之为PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有ack(Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理
2.队列(生产)相关命令
2.1 XADD
添加消息到队列末尾,如果队列不存在,则会先创建队列
key
键*|id
消息的ID,格式必须是时间戳-序列号
这样的方式,下一条的ID必须比上一条的要大,写成*
号,系统将自动生成。ID比较大小时,先比较时间戳大小,若相同再比较序列号field value
键值对
XADD key *|id field value [field value ...]
例
127.0.0.1:6379> XADD mystream * name lzj age 28
1748835683065-0
127.0.0.1:6379> XADD mystream * name xxn age 24
1748835967616-0
返回的1748835683065-0
和1748835967616-0
就是消息的ID,-
前的数字为生成消息时的毫秒级时间戳,-
后面的数字代表同一毫秒内产生的第几个序列号
在相同的毫秒下序列号从0开始递增,序列号是64位长度,理论上在同一毫秒内生成的数据量无法到达这个级别,因此不用担心序列号会不够用。毫秒数取的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID,也即redis在增加信息条目时会检査当前id与上一条目的id,自动纠正错误的情况,一定要保证后面的id比前面大,一个流中信息条目的ID必须是单调增的,这是stream的基础。
Stream的数据类型就是Stream
127.0.0.1:6379> type mystream
stream
2.2 XRANGE
获取消息列表(可以指定范围),忽略删除的消息
key
对应的streamstart
开始值,用-
表示最小end
结束值,用+
表示最小count
限制条数
XRANGE key start end [Count count]
例
127.0.0.1:6379> xrange mystream - +
1748835683065-0
name
lzj
age
28
1748835967616-0
name
xxn
age
24
127.0.0.1:6379> xrange mystream - + Count 1
1748835683065-0
name
lzj
age
28
2.3 XREVRANGE
和XRANGE相比,区别在于反向获取,ID从大到小
例
127.0.0.1:6379> xrevrange mystream + -
1748835967616-0
name
xxn
age
24
1748835683065-0
name
lzj
age
28
127.0.0.1:6379> xrevrange mystream + - count 1
1748835967616-0
name
xxn
age
24
2.4 XDEL
删除消息,按照ID删除
XDEL key id [id ...]
例
XDEL mystream 1748835967616-0
2.5 XLEN
获取Stream中的消息长度
127.0.0.1:6379> XLEN mystream
2
2.6 XTRIM
限制Stream的长度,如果已经超长会进行截取
MAXLEN
允许的最大长度,对流进行修剪限制长度MINID
允许的最小id,从某个id值开始比该id值小的将会被抛弃
XTRIM mystream MAXLEN|MINID n
例
127.0.0.1:6379> XTRIM mystream MAXLEN 2
4
127.0.0.1:6379> XTRIM mystream MINID 1748841640027-0
1
2.7 XREAD
获取消息(阻塞/非阻塞),返回大于指定ID的消息
COUNT
最多读取多少条BLOCK
是否以阻塞的方式读取消息,默认不阻塞,如果miliseconds设置为0,表示永远阻塞
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
例
127.0.0.1:6379> xrange mystream - +
1748841640027-0
id
18
1748842099599-0
id
2323
1748842102200-0
id
232
1748842104391-0
id
23cecs
1748842107092-0
id
23ced32w2w
1748842109798-0
id
23ced3w2dw
1748842112394-0
id
23ced323dw2
3.消费相关命令
"如果文章对您有帮助,可以请作者喝杯咖啡吗?"

微信支付

支付宝