【备用】交换器

概述

有时我们可能会希望有客户端处理不能被路由的消息 (对应的RK没有消费者或者broker找不到对应的RK),作用大致有:

  • 监控客户端刻意或者偶尔发送的无法路由的消息,便于后续处理
  • 特殊消息需要特殊处理

“备用”交换器 Alternate Exhcange,简称AE,就是用于处理这种情况下的一种交换器

AE跟之前我们学过的普通的交换器一样,唯一的区别或许就是:普通交换器接收的消息是客户端直接推送的,而AE接收的消息是经过另一个交换器转发来的。 如果某个交换器配置了AE,则当某个消息无法路由到指定的队列的时候,就会尝试推送到配置的AE,如果AE仍无法路由,则会推送到AE配置的另一个AE,如此循环下去,直到消息能够被消费。 当然,如果消息最终还是未能路由,那么该消息就丢失了

配置AE

配置一个交换器的AE有两种方式

  • 定义交换器时通过配置参数 alternate-exchange

    ...
    Map<String, Object> exchangeArgs = new HashMap<>(1);
    exchangeArgs.put("alternate-exchange", ALTERNATE_EXCHANGE_NAME);
    channel.exchangeDeclare(BUSINESS_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, exchangeArgs);
    ...
    
  • 使用 policies

    rabbitmqctl set_policy AE "^my-direct$" '{"alternate-exchange":"my-ae"}' 
    

具体示例代码

  • 生产者程序

    ...
    try (Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel()) {
       for (int i = 1; i <= 8; i++) {
           String message = "NO." + i;
           channel.basicPublish(BUSINESS_EXCHANGE_NAME, RK2, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
           System.out.println(String.format("发送消息:[%s]成功", message));
       }
    }
    ...
    
  • 运行结果

    发送消息:[NO.1]成功
    发送消息:[NO.2]成功
    发送消息:[NO.3]成功
    发送消息:[NO.4]成功
    发送消息:[NO.5]成功
    发送消息:[NO.6]成功
    发送消息:[NO.7]成功
    发送消息:[NO.8]成功
    
  • 业务消费者

    channel.queueBind(QUEUE_NAME, BUSINESS_EXCHANGE_NAME, RK);
    
  • AE消费者

    ...
    channel.exchangeDeclare(ALTERNATE_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, ALTERNATE_EXCHANGE_NAME, "");
    
    channel.basicConsume(queueName, new DefaultConsumer(channel) {
        @Override
        public void handleConsumeOk(String consumerTag) {
            super.handleConsumeOk(consumerTag);
            System.out.println("准备接收alternative消息...");
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body);
            channel.basicAck(envelope.getDeliveryTag(), false);
            System.out.println(String.format("收到消息:[%s]", message));
        }
    });
    ...
    
  • 运行结果

    准备接收alternative消息...
    收到消息:[NO.1]
    收到消息:[NO.2]
    收到消息:[NO.3]
    收到消息:[NO.4]
    收到消息:[NO.5]
    收到消息:[NO.6]
    收到消息:[NO.7]
    收到消息:[NO.8]
    

程序中定义了一个AE名称为 ALTERNATE_EXCHANGE_NAME,类型为 fanout。业务消费者使用的是 RK,生产者使用的 RK2 因此无法路由到任何的队列中,这时消息就会推送到AE,进而进入到对应的AE绑定的队列中,最后被消费