队列的长度限制
队列可以被设置大小,包括:队列中最大容纳的消息数目以及队列中容纳消息的最大总字节数。这两个值可以通过队列的参数 (x-max-length 和 x-max-length-bytes) 以及 policy (max-length 和 max-length-bytes) 来设置。
max-length|x-max-length表示队列中容纳的最多消息个数max-length-bytes|x-max-length-bytes表示队列中的消息最大总字节数
默认情况下,当设置了容量的队列中的消息达到了设置的最大容量值后,队列会将处于队列前面的那些消息(也就是比较旧的消息) 移除或者放入”死信队列”。如果需要改变这种溢出处理策略,则可以通过队列参数 overflow 或者 x-overflow 进行修改。
overflow|x-overflow定义队列溢出后数据处理策略
该参数有两个值 drop-head 和 reject-publish,默认是 drop-head。 如果被设置为 reject-publish 则如果收到新消息,则会直接拒绝并丢弃。如果这时生产者使用了 publish confirm 模式,则生产者会收到 nack 通知。 当一个生产者对应有多个消费者,如果其中一个消费者对应的队列满了,并且设置了 overflow=reject-publish,则生产者会收到 nack 通知。需要注意的是:如果设置了溢出策略为 reject-publish 则被丢弃的消息不会进入对应的 “死信交换器”
生产者程序:生产者连续推送8条消息到broker
...
for (int i = 1; i <= 8; i++) {
String message = "NO." + i;
channel.basicPublish(BUSINESS_EXCHANGE_NAME, RK, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(String.format("发送消息:[%s]成功", message));
}
...
业务消费者程序
...
channel.basicQos(1);
Map<String, Object> queueArgs = new HashMap<>(2);
queueArgs.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
queueArgs.put("x-max-length", 2);
channel.queueDeclare(QUEUE_NAME, false, false, true, queueArgs);
channel.queueBind(QUEUE_NAME, BUSINESS_EXCHANGE_NAME, RK);
// 消费消息
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
long deliveryTag = envelope.getDeliveryTag();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(deliveryTag, false);
System.out.println(String.format("消费消息:[%s],手动确认ack", message));
}
});
...
消费者设置 prefetch 为1,即 ack 确认前该消费者不能再接收消息, 并且收到消息后会sleep 3s的时间,为了让队列中能够累积消息
运行结果
业务消费者
消费消息:[NO.1],手动确认ack
消费消息:[NO.7],手动确认ack
消费消息:[NO.8],手动确认ack“死信”消息消费者
收到死信消息:[NO.2],手动回复ack
收到死信消息:[NO.3],手动回复ack
收到死信消息:[NO.4],手动回复ack
收到死信消息:[NO.5],手动回复ack
收到死信消息:[NO.6],手动回复ack
从程序运行结果中可以看出,当消费者消费了第一条消息后,由于sleep 3s 的原因,这时队列中的消息产生了堆积。 由于设置了 x-max-length 为 2,因此队列中只存了两条最新的消息 NO.7 和 NO.8,且 NO.2~NO.6 都变成了 “死信” 消息, 转发到了 “死信” 交换器 DEAD_LETTER_EXCHANGE_NAME
需要关注的部分
- 需要通过
channel.basicQos()方法设置一个合适的prefetch限制每个消费者在ack之前可以最多收到的消息数目。 如果没有设置,那么broker总会将队列中的消息立刻转发到对应的客户端,此时x-max-length的设置也就没有了意义 - 如果更改了队列溢出策略
overflow为reject-publish则超出限制的消息不会被 “死信”