1 “死信”交换器介绍
“死信”交换器跟普通的各种类型的交换器是一样的,你完全可以像定义普通交换器那样定义它。 当消息变为”死信”的时候,就会被转发到设置的对应的”死信”交换器上。 那么消息在什么情况下会变成”死信”呢?
- 当消息被消费者使用
basic.reject
或者basic.nack
拒绝,并且requeue
参数设置为false
, 即被拒绝但是要求不要重新入队的消息。 注意:这种情况下一定要设置auto-delete
为false - 消息由于设置了TTL导致过期,这种情况下消息变为”死信”消息后,其原来的过期属性
expiration
会被移除,防止再次过期 - 消息对应的队列设置了队列大小限制,超过限制被丢弃的消息。 这种情况下,
overflow
参数必须是默认的drop-head
超出限制的消息才会被认为是”死信”消息,进而被推送到”死信”交换器上;如果设置为reject-public
则不会被认为是”死信”消息
请注意:当为队列设置了TTL时间,队列过期了,但是里面的消息并不会被看做”死信”消息
2 “死信”交换器的设置
2.1 创建”死信”交换器
“死信”交换器可以使用RabbitMQ的 WEB UI 控制台进行添加。 当然,你仍然可以像创建普通交换器那样,在代码创建”死信”交换器
2.2 “死信”交换器的设置
可以通过设置队列的相应参数来控制该队列中”死信”消息需要转发到的具体的”死信”交换器,设置参数的方法有两种:通过定义队列是设置队列参数以及通过 policies
来控制
使用
policies
来控制
设置 dead-letter-exchange
属性,指定具体的”死信”交换器。 也可以设置 dead-letter-routing-key
属性来控制推送消息到指定的”死信”队列时使用的 RoutingKey
,如果没有设置,则默认会使用该消息本身已经拥有的 RoutingKey
来路由消息
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
使用队列参数控制
定义队列时使用队列参数定义:设置 x-dead-letter-exchange
和 x-dead-letter-routing-key
参数。
Map<String, Object> queueArgs = new HashMap<>(4);
queueArgs.put("x-dead-letter-exchange", "dead.letter.test.yb");
channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs);
“死信”消息 – 消费者拒绝消息且要求不重新入队
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
};
“死信”消息 – TTL消息超时: 如果消息在队列中3s之内不能分发到对应的消费者,则该消息就会超时被转发到”死信”交换器
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder()
.expiration("3000")
.build();
channel.basicPublish(EXCHANGE_NAME, "RK-DEAD-LETTER-001", basicProperties, message.getBytes());
“死信”消息 – 队列消息超限: 队列中如果已经含有2条消息未分发到消费者,则当新消息进来时就会将原来的消息依次丢弃进入对应的”死信”交换器。 注意:
overflow
参数必须保证为默认的drop-head
,否则将不会变为 “死信” 消息
Map<String, Object> queueArgs = new HashMap<>(4);
queueArgs.put("x-max-length", 2); // 最多容纳2条未消费的消息
channel.queueDeclare(QUEUE_NAME, false, false, false, queueArgs);
当该队列中的消息发生”死信” (消息被拒绝、消息在队列中超时、消息数量超限被丢弃) 时,该消息会通过其原来的 RoutingKey
(或者 x-dead-letter-routing-key
参数指定的值) 推送到 dead.letter.test.yb
交换器,进而推送到对应的消费队列中
3 “死信”消息的 headers
“死信”消息的 headers
中法跟其他消息有所不同,其 headers
如下:
{
x-first-death-exchange=test-dead-exchange,
x-first-death-reason=expired,
x-first-death-queue=test-dead-queue-001,
x-death=[{
reason=expired,
original-expiration=3000,
count=1,
exchange=test-dead-exchange,
time=Wed Jun 05 15:24:12 CST 2019,
routing-keys=[RK-DEAD-LETTER-001],
queue=test-dead-queue-001}
]
}
“死信”消息会在 headers
中添加一个名为 x-death
的数组,数组中的key字段意义大致如下:
reason
:成为”死信”消息的原因,其值有:expired
、rejected
、maxlen
original-expiration
: 该消息原来定义的超时时间count
:该消息在当前这个队列中由于当前这个原因被”死信”的次数exchange
:该消息原来所在的交换器time
:成为”死信”消息的时间routing-keys
:该消息对应的RKqueue
:该消息原来所在的队列
除此之外,还增加有 x-first-death-exchange
,x-first-death-reason
和 x-first-death-queue
三个属性,从命名上就可以知道,这三个标识的是该消息首次成为”死信”消息对应的 x-death
中的 exchange
,reason
,queue
三个值,不同的是这三个值后续不会再改变。 当成为”死信”消息到达对应的”死信”队列后,如果记录进行处理推送,如果再次变成”死信”则 x-death
字段中就会有多个对象出现,count
值就会相应增加