队列的长度限制

队列的长度限制

队列可以被设置大小,包括:队列中最大容纳的消息数目以及队列中容纳消息的最大总字节数。这两个值可以通过队列的参数 (x-max-lengthx-max-length-bytes) 以及 policy (max-lengthmax-length-bytes) 来设置。

  • max-length | x-max-length 表示队列中容纳的最多消息个数
  • max-length-bytes | x-max-length-bytes 表示队列中的消息最大总字节数

默认情况下,当设置了容量的队列中的消息达到了设置的最大容量值后,队列会将处于队列前面的那些消息(也就是比较旧的消息) 移除或者放入”死信队列”。如果需要改变这种溢出处理策略,则可以通过队列参数 overflow 或者 x-overflow 进行修改。

  • overflow | x-overflow 定义队列溢出后数据处理策略

该参数有两个值 drop-headreject-publish,默认是 drop-head。 如果被设置为 reject-publish 则如果收到新消息,则会直接拒绝并丢弃。如果这时生产者使用了 publish confirm 模式,则生产者会收到 nack 通知。 当一个生产者对应有多个消费者,如果其中一个消费者对应的队列满了,并且设置了 overflow=reject-publish,则生产者会收到 nack 通知。需要注意的是:如果设置了溢出策略为 reject-publish 则被丢弃的消息不会进入对应的 “死信交换器”

生产者程序:生产者连续推送8条消息到broker

...
for (int i = 1; i <= 8; i++) {
    String message = "NO." + i;
    channel.basicPublish(BUSINESS_EXCHANGE_NAME, RK, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    System.out.println(String.format("发送消息:[%s]成功", message));
}
...

业务消费者程序

...
channel.basicQos(1);

Map<String, Object> queueArgs = new HashMap<>(2);
queueArgs.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
queueArgs.put("x-max-length", 2);
channel.queueDeclare(QUEUE_NAME, false, false, true, queueArgs);
channel.queueBind(QUEUE_NAME, BUSINESS_EXCHANGE_NAME, RK);

// 消费消息
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body);
        long deliveryTag = envelope.getDeliveryTag();

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        channel.basicAck(deliveryTag, false);
        System.out.println(String.format("消费消息:[%s],手动确认ack", message));

    }
});
...

消费者设置 prefetch 为1,即 ack 确认前该消费者不能再接收消息, 并且收到消息后会sleep 3s的时间,为了让队列中能够累积消息

运行结果

  • 业务消费者

    消费消息:[NO.1],手动确认ack
    消费消息:[NO.7],手动确认ack
    消费消息:[NO.8],手动确认ack

  • “死信”消息消费者

    收到死信消息:[NO.2],手动回复ack
    收到死信消息:[NO.3],手动回复ack
    收到死信消息:[NO.4],手动回复ack
    收到死信消息:[NO.5],手动回复ack
    收到死信消息:[NO.6],手动回复ack

从程序运行结果中可以看出,当消费者消费了第一条消息后,由于sleep 3s 的原因,这时队列中的消息产生了堆积。 由于设置了 x-max-length2,因此队列中只存了两条最新的消息 NO.7NO.8,且 NO.2~NO.6 都变成了 “死信” 消息, 转发到了 “死信” 交换器 DEAD_LETTER_EXCHANGE_NAME

需要关注的部分

  • 需要通过 channel.basicQos() 方法设置一个合适的 prefetch 限制每个消费者在 ack 之前可以最多收到的消息数目。 如果没有设置,那么broker总会将队列中的消息立刻转发到对应的客户端,此时 x-max-length 的设置也就没有了意义
  • 如果更改了队列溢出策略 overflowreject-publish 则超出限制的消息不会被 “死信”