Skip to content

Redis Stream是如何运作的

xread与xreadgroup的差异

xread可以让消费者不受限制地访问到所有或指定区间的消息。

xreadgroup可以让消费组中的消费者,访问到只属于自己的处于pending中的历史消息和接下来分发给自己的消息。

xread 和 xrange

xread 用于客户端订阅一系列 stream, 持久化的订阅模式.

xrange 用于实现范围查询.

Pending

在redis中存在消费组,消费组中有多个消费者

通过xreadgroup,redis会将消息分发给消费组中的某个消费者,此时,在消费组中,消息成为pending状态,并且属于该消费者,直到消费者调用xack 更改消息的状态。

通过xreadgroup,消费者只能访问到属于自己的历史pending消息。

通过xpending可以查看到指定区间或所有处于pending中的消息,包括分发对象消费者,分发次数,消息空闲时间.

通过消息空闲时间,可以进行所有权的转移。

所有权

每当消息通过消费组分发给其中一个消费者后,这条消息将属于这个消费者,直到转移所有权之前,消费组不会再将消息分发给其他消费者。

同样的,当消费者故障后,也是上述逻辑。

如何观察消费组的状态

通过xinfo,可以查看所有消费者的处理消息数,pending消息数,空闲等待时间,最后一条分发消息的id

通过xinfo可以判断消费组中消费者的活跃和负载状态

如何转移所有权

首先,通过xpending,可以查看所有pending消息的空闲等待时间和所属的消费者。

通过xclaim,可以指定空闲等待时间超过某个时间的消息,通过指定消息id的方式,将所有权转移给某个消费者。

xclaim可以指定空闲等待时间,而转移所有权后,对应消息的空闲等待时间会重置,所以不会出现多个消费者同时转移所有权造成的消息所有权错乱

另外,在6.2后的版本,通过xautoclaim,不需要指定消息id,就可以将指定数量的,超过指定空闲等待时间的消息,所有权转移到指定的消费者

死信队列

xpending可以查到所有pending消息的空闲等待时间和交付次数,可以根据交付次数来综合业务,将消息转移到死信队列中

概念

  • >, 表示请求从未发配给其他客户端的消息.
  • entry ID, <millisecondsTime>-<sequenceNumber>, 由毫秒时间戳和序列号组成的字符串, 两部分元素都为 int64, 每个 stream 内的消息必须保证消息 id 递增.
  • (1519073279157-0, 在 entry ID 前加(前缀, 表示start不包含该条消息
  • $, 表示最新 id
  • -, 表示最小 id
  • +, 表示最大 id

一些想法