概述
有时我们可能会希望有客户端处理不能被路由的消息 (对应的RK没有消费者或者broker找不到对应的RK),作用大致有:
- 监控客户端刻意或者偶尔发送的无法路由的消息,便于后续处理
- 特殊消息需要特殊处理
“备用”交换器 Alternate Exhcange,简称AE,就是用于处理这种情况下的一种交换器
AE跟之前我们学过的普通的交换器一样,唯一的区别或许就是:普通交换器接收的消息是客户端直接推送的,而AE接收的消息是经过另一个交换器转发来的。 如果某个交换器配置了AE,则当某个消息无法路由到指定的队列的时候,就会尝试推送到配置的AE,如果AE仍无法路由,则会推送到AE配置的另一个AE,如此循环下去,直到消息能够被消费。 当然,如果消息最终还是未能路由,那么该消息就丢失了
配置AE
配置一个交换器的AE有两种方式
定义交换器时通过配置参数
alternate-exchange... Map<String, Object> exchangeArgs = new HashMap<>(1); exchangeArgs.put("alternate-exchange", ALTERNATE_EXCHANGE_NAME); channel.exchangeDeclare(BUSINESS_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, exchangeArgs); ...使用
policiesrabbitmqctl set_policy AE "^my-direct$" '{"alternate-exchange":"my-ae"}'
具体示例代码
生产者程序
... try (Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel()) { for (int i = 1; i <= 8; i++) { String message = "NO." + i; channel.basicPublish(BUSINESS_EXCHANGE_NAME, RK2, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(String.format("发送消息:[%s]成功", message)); } } ...运行结果
发送消息:[NO.1]成功 发送消息:[NO.2]成功 发送消息:[NO.3]成功 发送消息:[NO.4]成功 发送消息:[NO.5]成功 发送消息:[NO.6]成功 发送消息:[NO.7]成功 发送消息:[NO.8]成功
业务消费者
channel.queueBind(QUEUE_NAME, BUSINESS_EXCHANGE_NAME, RK);
AE消费者
... channel.exchangeDeclare(ALTERNATE_EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, ALTERNATE_EXCHANGE_NAME, ""); channel.basicConsume(queueName, new DefaultConsumer(channel) { @Override public void handleConsumeOk(String consumerTag) { super.handleConsumeOk(consumerTag); System.out.println("准备接收alternative消息..."); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); channel.basicAck(envelope.getDeliveryTag(), false); System.out.println(String.format("收到消息:[%s]", message)); } }); ...运行结果
准备接收alternative消息... 收到消息:[NO.1] 收到消息:[NO.2] 收到消息:[NO.3] 收到消息:[NO.4] 收到消息:[NO.5] 收到消息:[NO.6] 收到消息:[NO.7] 收到消息:[NO.8]
程序中定义了一个AE名称为 ALTERNATE_EXCHANGE_NAME,类型为 fanout。业务消费者使用的是 RK,生产者使用的 RK2 因此无法路由到任何的队列中,这时消息就会推送到AE,进而进入到对应的AE绑定的队列中,最后被消费