优先级队列
从 3.5.0 版本开始,RabbitMQ提供了优先级队列的实现。如果要实现优先级队列需要两部操作即可:
通过队列参数定义具有优先级性质的队列
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-priority", 10);
channel.queueDeclare(queueName, true, false, true, arguments);
通过设置队列参数 x-max-priority 设置队列的优先级,队列的优先级支持 0~255 的数值,推荐使用的范围是: 1~10。 该属性设置的是该队列中的消息允许的 最大优先级数值,不支持使用 policy 设置,因为 policy 是可以随时更新改变的。
生产者推送消息的时候设置
AMQP.BasicProperties的priority为该消息具体的优先级数值
String[] messageAry = {"hello", "world", "test"};
for (String message : messageAry) {
AMQP.BasicProperties.Builder basicPropertiesBuilder = new AMQP.BasicProperties().builder();
if ("test".equalsIgnoreCase(message)) {
basicPropertiesBuilder.priority(6);
}
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicPropertiesBuilder.build(), message.getBytes());
System.out.println(String.format("send message[%s] over", message));
}
该数值范围为 0~255 默认不设置的时候是 0。 如果超过对应的队列中 x-max-priority 设置的最大值,则会被强制设置为 x-max-priority 指定的最大值
使用优先级队列,你可能需要关注这些
- 使用优先级队列,会存在内存、磁盘以及CPU的额外消耗,设置的优先级越高,消耗越大
- 如果消费者消费能力很强,可能不会有队列对其中的消息进行优先级排序的时间,那么这个时候或许结果看起来,设置的优先级并没有起到作用。 因此,通常会使用
channel.basicQos(1)设置prefetch - 当优先级和TTL同时使用时,可能会造成低优先级的已过期消息会被阻塞在高优先级未过期的消息后面,这些已过期的消息不会被传递给消费者,并且会计入队列的统计数据中
- 当优先级和队列最大限制一起使用时,可能会造成当队列达到最大限制时,还未消费的高优先级的消息被移除丢弃