1. 幂等性
(1)概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
(2)消息重复消费
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
(3)解决思路
一般使用全局 ID、唯一标识(时间戳、UUID)或 MQ 消息的 id tag 来判断,亦或是zi定义全局唯一 id,每次消费消息时先判断 id 对应的消息是否已消费过
(4)消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发送消息,这时候消费端就要实现幂等性,即消息永远不会被消费多次(即使多次收到了一样的消息)。
- 唯一ID + 指纹码机制,利用数据库主键去重
- 利用 redis 的原子性实现(setnx)
2. 优先级队列
(1)设置方式
在管理页面添加
在代码中添加
Map<String, Object> arguments = new HashMap<>();
//优先级范围 0-10(设置过大会存在性能问题)
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
设置优先级之后的队列
(2)测试
Producer
public class Producer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.123.88");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 创建队列
* 参数说明:
* 1.队列名称
* 2.是否持久化存储消息
* 3.是否进行消息共享(只供一个消费者进行消费)
* 4.是否自动删除(最后一个消费者断开连接后,是否自动删除该队列)
* 5.其他参数
*/
/**
* 2023-06-15偷懒 优先级队列
*/
Map<String, Object> arguments = new HashMap<>();
//优先级范围 0-10(设置过大会存在性能问题)
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
for (int i = 1; i < 10; i++) {
String msg = "hello world" + i;
if (i == 5) {
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
} else {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
}
}
}
Consumer
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.123.88");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody()));
CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费由于某些原因中断");
/**
* 接收消息
* 1.队列名称
* 2.消费成功后是否自动应答
* 3.未成功消费的回调
* 4.取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
3. 惰性队列
(1)使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念,惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或是由于维护停机等)而导致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存中,这样可以更快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间(同步操作),进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。
(2)两种模式
- default(默认模式):在 3.6.0 之前的版本无需做任何变更
- lazy 模式:即惰性队列模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。注:如果要通过声明的方式改变已有队列的模式,那么只能先删除队列,然后再重新声明一个新的队列,同时已产生的消息会同步删除。
在管理界面设置
在代码中设置
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-mode", "lazy");
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
评论 (0)