RabbitMQ - 发布确认补充

suaxi
2023-06-23 / 0 评论 / 115 阅读 / 正在检测是否收录...

1. 发布确认

在RabbitMQ不可用的情况下,如何处理无法投递的消息?

1.确认机制方案.png

2.确认机制方案.png



在application.yml配置文件中添加

# 消息确认类型 执行回调
publisher-confirm-type: correlated

补充:

  • NONE 禁用发布确认模式,是默认值
  • CORRELATED 发布消息成功到交换器后会触发回调方法
  • SIMPLE 包含两种效果:

    • 和 CORRELATED 值一样会触发回调方法
    • 在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker


(1)队列配置

@Configuration
public class ConfirmQueueConfig {

    public static final String CONFIRM_EXCHANGE = "confirm.exchange";

    public static final String CONFIRM_QUEUE = "confirm.queue";
    
    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
    }
}


(2)消息生产者

@Slf4j
@RestController
@RequestMapping("/confirm")
public class SendConfirmMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{msg}/{routingKey}")
    @ApiOperation("发送消息(发布确认)")
    public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) {
        log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey);
        //正常发送
        rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString()));
    }
}


(3)回调接口

@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机确认回调方法
     *
     * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (correlationData != null) {
            String id = correlationData.getId() != null ? correlationData.getId() : "";
            if (ack) {
                log.info("交换机收到ID:{} 的消息", id);
            } else {
                log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause);
            }
        } else {
            log.error("消息接收发生了异常!");
        }
    }
}


(4)消费者

@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = ConfirmQueueConfig.CONFIRM_QUEUE)
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到 {} 队列的消息:{}", ConfirmQueueConfig.CONFIRM_QUEUE, msg);
    }
}


(5)测试

3.确认机制测试.png

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了


2.回退消息

(1)Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,并且生产者是不知道消息被丢弃的。此时可通过设置 mandatory 参数,当消息在传递过程中不可达目的地时将消息返回给生产者


(2)包含回退的队列设置

@Configuration
public class ConfirmQueueConfig {

    public static final String CONFIRM_EXCHANGE = "confirm.exchange";

    public static final String CONFIRM_QUEUE = "confirm.queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
    }
}


(3)消息生产者

@Slf4j
@RestController
@RequestMapping("/confirm")
public class SendConfirmMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{msg}/{routingKey}")
    @ApiOperation("发送消息(发布确认)")
    public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) {
        log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey);
        //正常发送
        rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString()));
    }
}


(4)回调接口

@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机确认回调方法
     *
     * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (correlationData != null) {
            String id = correlationData.getId() != null ? correlationData.getId() : "";
            if (ack) {
                log.info("交换机收到ID:{} 的消息", id);
            } else {
                log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause);
            }
        } else {
            log.error("消息接收发生了异常!");
        }
    }

    /**
     * 消息在传递过程中不可达目的地时,进行消息回退
     * @param message the returned message.
     * @param replyCode the reply code.
     * @param replyText the reply text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息:{},被交换机:{}退回,路由key:{},原因:{}", new String(message.getBody()), exchange, routingKey, replyText);
    }
}


(5)测试

4.消息回退.png

参考未设置消息回退时的测试结果,设置回退后,未匹配routingkey的消息被返回到了队列中


3. 备份交换机

备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列

5.备份交换机.png


(1)队列配置

@Configuration
public class ConfirmQueueConfig {

    public static final String CONFIRM_EXCHANGE = "confirm.exchange";

    public static final String CONFIRM_QUEUE = "confirm.queue";

    public static final String CONFIRM_ROUTING_KEY = "key1";

    public static final String BACKUP_EXCHANGE = "backup.exchange";

    public static final String BACKUP_QUEUE = "backup.queue";

    public static final String WARRING_QUEUE = "warring.queue";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE);
    }

    @Bean("backupQueue")
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }

    @Bean("warringQueue")
    public Queue warringQueue() {
        return QueueBuilder.durable(WARRING_QUEUE).build();
    }

    @Bean
    public Binding queueBindingBackupExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding queueBindingWarringExchange(@Qualifier("warringQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }
}


(5)报警消费者

@Slf4j
@Component
public class WarringConsumer {

    @RabbitListener(queues = ConfirmQueueConfig.WARRING_QUEUE)
    public void receiveWarringMsg(Message message) {
        String msg = new String(message.getBody());
        log.warn("发现不可路由消息:{}", msg);
    }
}


(5)生产者、回调接口配置同理


(6)测试

6.1备份交换机-测试.png

6.2备份交换机-测试.png

注:mandatory 参数可与备份交换机同时使用,但备份交换机优先级最高

0

评论 (0)

取消