首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,086 阅读
2
类的加载
742 阅读
3
Spring Cloud OAuth2.0
726 阅读
4
SpringBoot自动装配原理
691 阅读
5
集合不安全问题
587 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
FastApi
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
389
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
FastApi
页面
统计
关于
搜索到
1
篇与
的结果
2023-06-19
RabbitMQ - WorkQueue
工作队列(又称任务队列):其主要思想是避免立即执行资源密集型任务,而不得不等待它完成。消息队列将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。1. 轮询分发(1)抽取工具类public class RabbitMqUtils { /** * 获取Channel * * @return Channel * @throws IOException * @throws TimeoutException */ public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection.createChannel(); } }(2)工作线程public class Worker01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("收到消息:" + new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); //接收消息 System.out.println("c1等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } } public class Worker02 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("收到消息:" + new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); //接收消息 System.out.println("c2等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }(3)Task线程public class Task01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("发送了消息:" + msg); } } }(4)结果生产者一共发送了4条消息,消费者1和消费者2按次序分别接收了两条消息2. 消息应答(1)概念RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息也将丢失,因为它无法接收。为了保证消息在发送过程中不丢失,rabbitmq 引入了消息应答机制,即:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。(2)自动应答消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为在这种模式下,如果消息在接收到之前,消费者那边出现连接的问题或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率处理这些消息的情况下使用。(3)消息应答的方式Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功地处理了消息,可以将其丢弃了Channel.basicNack(用于否定确认)Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,不处理该消息直接拒绝,表示可以将其丢弃了(4)Mutiplechannel.basicAck(deliveryTag, true)true 代表批量应答 channel 上未应答的消息,比如说 channel 上有传送 tag 为5,6,7,8的消息,当前的 tag 是 8,那么此时 5 - 8 的这些还未应答的消息都会被确认收到消息应答false 同上面相比 只会应答 tag = 8 的消息,5,6,7 这三个消息依然不会被确认收到消息应答(5)消息自动重新入队如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其进行重新排队。如果此时其他消费者(状态良好)可以处理任务,MQ将会很快将其重新分发给另一个消费者,这样,即使某个消费者遇到突发状况,也可以确保不会丢失任何消息。(6)Demo消息应答默认值为 true,即自动应答,此处以手动应答为例SleepUtilpublic class SleepUtils { public static void sleep(int seconds) { try { Thread.sleep(1000L * seconds); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }Producerpublic class Task02 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("生产者发送消息:" + msg); } } }Consumerpublic class Worker03 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c1等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(1); System.out.println("接收到的消息:" + new String(message.getBody())); //手动应答 /** * 参数说明: * 1.消息的标记 * 2.是否批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println("消费者取消消费接口回调逻辑")); } } public class Worker04 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(30); System.out.println("接收到的消息:" + new String(message.getBody())); //手动应答 /** * 参数说明: * 1.消息的标记 * 2.是否批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println("消费者取消消费接口回调逻辑")); } }在发送者发送消息 xxx 后把 C2 消费者停掉,按理说该 C2 来处理的消息,但是由于它处理时间较长,在还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 xxx 被重新入队,然后分配给能处理消息的 C1 消费者3. 消息持久化(1)概念默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会忽视队列和消息,除非告知它不要这样做,确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化(2)队列持久化之前创建的队列时都是非持久化的,rabbitmq 在重启之后该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为(true)持久化boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);注:如果之前声明的队列(相同的队列名)不是持久化的,需要把原先的队列先删除,或者重新创建一个不同名的持久化队列,不然会报错 received 'true' but current is 'false'在控制面板中 持久化 与 非持久化队列 的区别(3)消息持久化消息实现持久化需要修改生产者的代码,添加 MessageProperties.PERSISTENT_TEXT_PLAIN 属性# 未持久化 channel.basicPublish("", null, msg.getBytes()); # 持久化 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点,此时并没有真正写入磁盘。(4)不公平分发轮询分发在一些场景下并不是很好,比方说有两个消费者在处理任务,其中消费者1处理任务的速度非常快,而另外一个消费者2 处理速度却很慢,这个时候如果还是采用轮询分发的话就会让处理速度快的消费者在很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下并不太好,但此时 RabbitMQ 并不知道这种情况,它依然很公平的在进行消息分发,为了避免这种情况,可以设置参数 channel.basicQos(1)int prefetchCount = 2; channel.basicQos(prefetchCount);(5)预取值消息的发送是异步的,所以在任何时候,channel 上肯定不止只有一个消息,且消费者的手动确认本质上也是异步的,因此这里就存在一个未确认的消息缓冲区,所以要求开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。可以通过channel.basicQos() 方法设置”预取值“来完成,该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例:假设在通道上有未确认的消息 5,6,7,8,并且通道的预取计数为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。消息应答和 QoS 预取值对吞吐量有很大的影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗,应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接的节点的内存消耗变大,所以设置合适的预取值是一个反复试验的过程,不同的负载该值取值也不同,默认情况话建议100 到 300,这个范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险,预取值为 1 是最保守的,但这将使吞吐量变得很低,特别是消费者连接延迟很大的情况下//不公平分发 /** * prefetchCount 等于 0 公平分发(默认) * prefetchCount 等于 1 公平分发 * prefetchCount 大于1时 预取值 */ int prefetchCount = 2; channel.basicQos(prefetchCount);
2023年06月19日
42 阅读
0 评论
0 点赞