消息队列是典型的生产者-消费者模型,生产者只管发送消息,消费者只管监听队列取出消息,没有业务逻辑的侵入,实现了生产者和消费者的解耦,这篇博客主要从如下几个方面整理相关RabbitMQ的知识点
- 消息中间件在项目中的使用场景
- RabbitMQ常见的五种消息模型
- 如何避免消息的丢失
- SpringAMQP
目前比较主流的两种MQ分别是JMS和AMQP
- AMQP(Advance Message Queuing Protocol) 高级消息队列协议,他是一种协议,而不是具体的实现,换句话说,只要满足它的协议,使用什么客户端,什么语言实现都没关系!
- JMS(Java MessageService) 实际上是JMS API ,是sun公司提出来的消息标准
相同点: 两个担任的角色差不多,都是依据接口实现服务间的调用
不同点: 前者跨平台,跨语言,支持五种消息模型,后者只适用于java,仅规定了两种消息模型
常见的MQ产品
- ActiveMQ: 基于JMS
- RabbitMQ: 基于AMQP协议,erlang语言开发,稳定性好
- RockerMQ: 基于JMS,阿里巴巴产品,目前交由Apache基金会
- Kafka:分布式消息系统,高吞吐量
一. 使用场景
1. 异步处理
假设这样一个应用场景:
新用户来注册了,我们要求他填写手机的验证码,前端异步把手机号发送到后台,调用短息微服务发送短信,这时,用户可以接着填写剩下的信息,等验证码来了,一并提交给后台
2. 应用解耦
假设这样一个应用场景:
商品微服务,调用A方法,修改了商品的信息,静态页面微服务得重新生成静态页,搜索微服务得重新创建新的文档,但是问题来了,有关静态页的生成的所有逻辑方法都在静态页微服务,搜索微服务雷同,我们总不至于在 A()里面去再写一遍关于生成静态页和创键新文档的逻辑吧,这是用RabbitMQ的作用来了
- 不同的微服务之间使用RabbitMQ进行通信, 商品修改了,那我们就使用Dirct模式,并把商品id当作消息体,发送给静态页微服务以及搜微服务就好了,他只管发送,不管别的微服务怎么处理实现了 应用间的解耦
3. 流量削峰
流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.
二. 五种消息模型
一. HelloWorld
通过这个模型,可以知道,生产者想发送消息要做哪些准备工作 消费者想接收消息需要哪些准备工作,以及如何接收
- P(producer/ publisher):生产者,发送消息的服务.
- C(consumer):消费者,接收消息的服务,
- 红色区域就是MQ中的Q,可以把它理解成一个邮箱
- 首先信件来了不强求必须马上马去拿
- 其次,它是有最大容量的(受主机和磁盘的限制,是一个缓存区)
- 然后,不仅仅我们可以去拿新建,家人亲戚朋友,甚至是小偷,只要他有钥匙,就可以去拿(多个消费者监听同一个队列,争抢消息)
坐标
1 | <dependency> |
java代码,RabbitMQ-HelloWorld
生产者
1 | private final static String QUEUE_NAME = "simple_queue"; |
消费者
1 | private final static String QUEUE_NAME = "simple_queue"; |
消息确认ACK
如果上面的消费者出现异常的话,程序也就停止了,那我们的业务逻辑就没办法执行,因此我们禁用autoACK,选择手动ack
1 | /*@param queue the name of the queue |
在回调函数中添加:
1 | ...业务逻辑 |
这样,在RabbitMQ中的消息状态是这样的 Ready–>Uacked– 出现异常 –> Ready
二. Worker模型
工作队列,竞争消费模式,可以看到,通同一个队列我们绑定上了多个消费者,消费者争抢着消费消息,这可以有效的避免消息堆积,比如对于短信微服务集群来说就可以使用这种消息模型,来了请求,大家抢着消费掉,别等着
如何实现?
- 相对于上面的HelloWorld这其实就是相同的服务我们启动了多次罢了,自然就是这种架构
能者多劳?
给队列添加一条属性,不再是队列把任务平均分配开给消费者,而是让消费者,消费完了后,问队列要新的任务,这样能者多劳
1 | // 设置每个消费者同时只能处理一条消息 |
订阅模型分类
不同的订阅模型是根据交换机(Exchange)的类型划分的
订阅模型有三种
- Fanout: 广播模型,将消息发送给绑定给交换机的所有队列(因为他们使用的是同一个RoutingKey)
- Direct: 定向:把消息发送给拥有指定Routing Key (路由键)的队列
- Topic: 通配符,把消息传递给拥有 符合Routing Patten(路由模式)的队列
其中每个消费者拥有属于自己的队列,生产者直接把消息发送给交换机,由交换机决定到底把消息发送给谁
二. 广播模型–Fanout
我们看一下,如何做到,一条消息被多个消费者消费
- 这个模型的特点就是它在发送消息的时候,并没有指明Rounting Key , 或者说他指定了Routing Key,但是所有的消费者都知道,大家都能接收到消息,就像听广播
生产者
1 | public static void main(String[] argv) throws Exception { |
对于生产者来说,他不在去声明队列了, 获取完Channel之后,直接去创建交换机,然后发送消息
消费者:
1 | // 绑定队列到交换机 |
对于消费者,现在没有和它的队列直接绑定的生产者了,它要多做一件事,就是把自己的队列绑定到交换机上Exchange,当它们做完这件事之后,他们都会收到相应的消息
三 . 订阅模式–Direct定向
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
拥有不同的RoutingKey的消费者,会收到来自交换机的不同信息,而不是大家都使用同一个Routing Key ,和广播模型区分开来
生产者
1 | // 声明exchange,指定类型为direct |
消费者,可以绑定多个RoutingKey收到不同的消息
1 | // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete |
三 . 订阅模式–Topic通配
这个模式支持使用通配符
1 | // 声明exchange,指定类型为topic |
消费者
1 | // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete |
二. 持久化
1. ACK机制
- 上面的消费者的ACK机制可以有效的避免因为消费者端的出现异常而导致信息丢失
但是如果MQ挂了呢?
2. 持久化交换机
3. 持久化队列
4. 持久化消息
三. SpringAMQP
SpringAMQP帮我们实现了–生产者确认机制,对于不可路由的消息交换机会告诉生产者,使其重新发送
1. 环境搭建
坐标
1 | <dependency> |
配置文件
生产者
1 | rabbitmq: |
生产者使用AmqpTemplate模板发送消息
1 | try{ // 新增商品后, 发送消息, 路由键 消息体 |
消费端
他不需要AmqpTemplate模板发送消息,因此不配置
1 | rabbitmq: |
virtual-host,和当前用户绑定的虚拟主机名, 这就Oralce里面,不同限权的用户可以看到的界面,拥有的能力是不用的,在RabbitMQ中,用户只能看到和它相关的虚拟主机下面的信息
1 |
|