1. 发布确认
在RabbitMQ不可用的情况下,如何处理无法投递的消息?
在application.yml配置文件中添加
# 消息确认类型 执行回调
publisher-confirm-type: correlated
补充:
- NONE 禁用发布确认模式,是默认值
- CORRELATED 发布消息成功到交换器后会触发回调方法
SIMPLE 包含两种效果:
- 和 CORRELATED 值一样会触发回调方法
- 在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
(1)队列配置
@Configuration
public class ConfirmQueueConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
}
}
(2)消息生产者
@Slf4j
@RestController
@RequestMapping("/confirm")
public class SendConfirmMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}/{routingKey}")
@ApiOperation("发送消息(发布确认)")
public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) {
log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey);
//正常发送
rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString()));
}
}
(3)回调接口
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 注入
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机确认回调方法
*
* @param correlationData correlation data for the callback. 回调消息的ID及相关的信息
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlationData != null) {
String id = correlationData.getId() != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机收到ID:{} 的消息", id);
} else {
log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause);
}
} else {
log.error("消息接收发生了异常!");
}
}
}
(4)消费者
@Slf4j
@Component
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmQueueConfig.CONFIRM_QUEUE)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("接收到 {} 队列的消息:{}", ConfirmQueueConfig.CONFIRM_QUEUE, msg);
}
}
(5)测试
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了
2.回退消息
(1)Mandatory 参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,并且生产者是不知道消息被丢弃的。此时可通过设置 mandatory 参数,当消息在传递过程中不可达目的地时将消息返回给生产者
(2)包含回退的队列设置
@Configuration
public class ConfirmQueueConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
}
}
(3)消息生产者
@Slf4j
@RestController
@RequestMapping("/confirm")
public class SendConfirmMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}/{routingKey}")
@ApiOperation("发送消息(发布确认)")
public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) {
log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey);
//正常发送
rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString()));
}
}
(4)回调接口
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 注入
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机确认回调方法
*
* @param correlationData correlation data for the callback. 回调消息的ID及相关的信息
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlationData != null) {
String id = correlationData.getId() != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机收到ID:{} 的消息", id);
} else {
log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause);
}
} else {
log.error("消息接收发生了异常!");
}
}
/**
* 消息在传递过程中不可达目的地时,进行消息回退
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息:{},被交换机:{}退回,路由key:{},原因:{}", new String(message.getBody()), exchange, routingKey, replyText);
}
}
(5)测试
参考未设置消息回退时的测试结果,设置回退后,未匹配routingkey的消息被返回到了队列中
3. 备份交换机
备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列
(1)队列配置
@Configuration
public class ConfirmQueueConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
public static final String BACKUP_EXCHANGE = "backup.exchange";
public static final String BACKUP_QUEUE = "backup.queue";
public static final String WARRING_QUEUE = "warring.queue";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
}
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE);
}
@Bean("backupQueue")
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE).build();
}
@Bean("warringQueue")
public Queue warringQueue() {
return QueueBuilder.durable(WARRING_QUEUE).build();
}
@Bean
public Binding queueBindingBackupExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding queueBindingWarringExchange(@Qualifier("warringQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}
(5)报警消费者
@Slf4j
@Component
public class WarringConsumer {
@RabbitListener(queues = ConfirmQueueConfig.WARRING_QUEUE)
public void receiveWarringMsg(Message message) {
String msg = new String(message.getBody());
log.warn("发现不可路由消息:{}", msg);
}
}
(5)生产者、回调接口配置同理
(6)测试
注:mandatory 参数可与备份交换机同时使用,但备份交换机优先级最高
评论 (0)