目录

RabbitMQ从入门到实战-2

RabbitMQ从入门到实战-2

这篇讲解通过java客户端操作rabbitmq

Java客户端

可以通过不同客户端操作mq

我们这里用spring AMQP(AMQP是一种协议)

https://i-blog.csdnimg.cn/direct/9a291d14de444db89d1174050cbba728.png

快速入门

交换机需要和队列进行绑定,而我们的消费者需要监听队列,生产者指定生产到哪个队列

这里我们先不发交换机,直接发送到队列,为了简单一点

自己创建队列哈,这里就不说了

https://i-blog.csdnimg.cn/direct/f640e9dc7d1044b19cc17988115b519e.png

使用springAMQP过程

https://i-blog.csdnimg.cn/direct/b606c3bff8884324befacca44cabb8b4.png

这里配置根据自己的情况不同调整

https://i-blog.csdnimg.cn/direct/bfbb7cd950594f2caccb908b0e5d0e56.png

提供生产者发送消息

提供的也是一个template,convertAndSend方法去发送消息,测试类中编写完直接运行即可

https://i-blog.csdnimg.cn/direct/a0e480c37d6f44c0b7b56d80e9c26b32.png

提供消费者监听simple.queue,这里的接收参数和我们发送参数类型一致,spring会帮我们自动转化并接收执行方法中的内容

然后将项目启动就可以监听了

https://i-blog.csdnimg.cn/direct/5972583c06c74983b9947087e756171b.png

https://i-blog.csdnimg.cn/direct/ced2808310484badb7aba6cb47870440.png

WorkQueue(多消费)

一个队列,多个消费者

消费者之间属于竞争关系(队列中的一个数据只能被一个消费者获取)

这点和交换机与队列的关系不同(由交换)

消费者从queue中获取数据,进行处理,如果不设置其他操作,每一个消费者只需要处理n/m个数据(n为生产者生成数,m为消费者消费数)

下面我们在一个类建了两个消费者方法,一般不会这样做(因为他们消耗的是同一个机器资源)

正常我们会写一个消费者方法,复制多个springboot项目实例部署,形成集群

启动多个消费者,绑定到这个队列,这样里面的代码逻辑也相同,减轻单个消费者服务器的压力

https://i-blog.csdnimg.cn/direct/6582eb6462124007b6e104950ed079c7.png

生产者发50条数据

https://i-blog.csdnimg.cn/direct/7b6c593e269e42f6a0bdd89b0c22919f.png

能者多劳配置

新需求

让消费者1每秒处理40条消息,消费者2每秒处理5条消息

代码改造

https://i-blog.csdnimg.cn/direct/934dd88ae67e458f9fdb8e13ab4632c4.png

出现问题

我们消息消费速度跟不上消息分配速度

我们消息分配还是一人一条这样分配,先给1再给2

其实在1和2处理完之前按就分配完了,1和2都分配了50条,就是平均分配

这样没有考虑到谁消费的快的问题,这样的话,2处理的慢,你还给人分50条就太离谱了,影响整体性能

如果可以让1消费完去帮助2消费就可以了

https://i-blog.csdnimg.cn/direct/fde701d101c84226a2c369bce85b4461.png

那么如何让先消费完的消费者优先获取消息呢?

如何发挥我们MQ消费者最大性能(处理完的先分配)

而不是轮流给,做以下设置

保证同一时刻给一条信息,处理完就会再给,不空闲

这样处理快的可能处理2条的同时慢的处理1条,发挥最大效能

在消费者项目的配置文件中进行配置

https://i-blog.csdnimg.cn/direct/ad8242e8d0ca4ee092a92fea4e6d498d.png

这样就实现了最大效率

https://i-blog.csdnimg.cn/direct/5f25e657ad99403e93c639271b29b277.png

交换机

交换机绑定队列,由其交换机类型决定发送的到绑定队列的方式

发送到交互机要写三个参数,(交换机名,队列名,信息)

https://i-blog.csdnimg.cn/direct/13603d8dc96940c19be63a39c7f099b0.png

https://i-blog.csdnimg.cn/direct/f46bf71057f74dceaf0dd2c8a3c59eba.png

fanout交换机

队列发的消息,只可能被一个消费者处理

而通过我们这个交互机可以达到同一个消费被多个连接不同队列的消费者处理(因为队列没有广播性质)

应用场景:比如说我们之前的支付案例

我们会干1.通知订单服务2.通知短信服务3.通知积分服务

三件事都要做,不同的微服务相当于消费者,不同服务连接不同队列

如果你只能发送一条信息,只有三者之一可以实现,如果可以广播,三者都可以实现

https://i-blog.csdnimg.cn/direct/ad81105be9f8487ead98b900ada5959d.png

案例

https://i-blog.csdnimg.cn/direct/72444b85a7bc4951a259d68ba45c93fe.png

声明队列,在控制台添加就行,这里不展示

https://i-blog.csdnimg.cn/direct/12dc91d9c9d5472cbf4a2430a9b03b99.png

这里也可以用他默认的交互机,也可以自己建,从type中选择类型

https://i-blog.csdnimg.cn/direct/eb59c43bda90492e8430b7c37936df5c.png

消费者设置

https://i-blog.csdnimg.cn/direct/f7892eaf020a4d699de0b358a513eecd.png

发送到交互机要写三个参数,(交换机名,队列名,信息)

我们这里因为要发给交换机,队列直接弄成null,如果不写null,会把你的交换机名当成队列名发送

https://i-blog.csdnimg.cn/direct/da372696ed04431b9660f93a2057a4fb.png

然后发送,两个队列都会监听到

https://i-blog.csdnimg.cn/direct/f28f17f4613e4f348f50d6229ac3b551.png

Direct交换机

交换机和队列链接时候设置bingkey(连接在同一个交互机的该值可以相同)

然后我们设置三个参数时候

设置第二个参数,只有bindkey=routingkey这两才,对应的队列才可以接收到交换机的消息

(由于bindkey可相同,可以实现多发和单发两种效果)

https://i-blog.csdnimg.cn/direct/b27f21de598a464d818929ad13a264a3.png

案例

bindkey不是队列的属性,而是交换机指定和队列相连的属性

所以同一个队列可能对交互机而言有多个bindkey

https://i-blog.csdnimg.cn/direct/09553e8ecc724e569eca95253008e6be.png

https://i-blog.csdnimg.cn/direct/f0a5e8c83ab8475a8616c0f3078583f1.png

https://i-blog.csdnimg.cn/direct/2922f8cc158a4903afdda2535a3de7be.png

指定红两个都可以收到,而指定蓝色,只有消费者1可以收到

https://i-blog.csdnimg.cn/direct/cfe7cfed679f4a6cb5557c08f1804a34.png

https://i-blog.csdnimg.cn/direct/7cf321c5e59c49e78a7c556982e3467c.png

https://i-blog.csdnimg.cn/direct/38fb547bdf0d414cbe57c0951084d10a.png

Topic交互机

只有Topic交互机连接的队列的bindkey里带#或者*才会识别为通配符

一个key代替多种routingkey含义

减少了一个个绑定key的麻烦,而且你之后一个类型可以直接用这个通配符(符合你想处理的情况),可扩展性

这样的话,你发送china.union也会到queue1,发送china.mei也会发到queue1

https://i-blog.csdnimg.cn/direct/c09e9c7c681a4420bddab9f56c645a4b.png

https://i-blog.csdnimg.cn/direct/60c861e7d6e94c929c08d3059360b2a5.png

https://i-blog.csdnimg.cn/direct/4fab48b379eb45d89f016fcdc55f52cd.png

https://i-blog.csdnimg.cn/direct/072b8f598344459e877ac1c23dd00c75.png

https://i-blog.csdnimg.cn/direct/3fd70e5396af4b09bd77e9467dd92eff.png

声明队列和交互机(IDEA中)

基于Bean声明队列和交换机

https://i-blog.csdnimg.cn/direct/15ba86e49c4c40f7abceab6ac1a38fef.png

队列和交换机直接new(也可以用Bulider创建,有ExchageBuilder和QueueBuilder),连接需要

建队列有个durable属性可以选择(默认持久),持久化,队列会持久化到磁盘(通常都是持久化)

这里bind没有指定key因为是fanout交换机是广播 ,如果是别的话,可以跟一个with方法指定交换名字

https://i-blog.csdnimg.cn/direct/66938755ec0e47e39417cbeaca227dfa.png

这些交换机和队列会在我们启动项目时候声明处理(停止后也不会消失)

基于注解声明(推)

使用bean还是比较麻烦,写的代码太多

而且每个方法只能声明一个bindkey,要写n个就得n个方法

https://i-blog.csdnimg.cn/direct/094fa0bedf2841c9aec29b1243c63bb6.png

还是之前的需求

用@Rabbitistener()+@QueueBinding()+@Queue()+@Exchage()只是声明bindings

声明这个后,我们的方法会作为队列的消费者,队列也会和声明的交换机产生对应的bindkey

这样的话我们的方法也会成为定义队列的消费者

可以使用ctrl+p提示

https://i-blog.csdnimg.cn/direct/fbace2ba18d54880964eab878f83943d.png

https://i-blog.csdnimg.cn/direct/4d8f5c2dccd345b2b9555a5aebe75fe2.png

同样的项目启动时候这些设置会持久化到mq中

消息转换器

之前我们发送消息

https://i-blog.csdnimg.cn/direct/89b44b77c51c4999afb27db9ded74c02.png

用convertAndSend方法,该方法最后一个参数其实为Object类型

我们可以传入不同对象传到Exchange中

而我们学的这个消息转换器就是帮我们转换对象的

https://i-blog.csdnimg.cn/direct/9ea6d0779e654b27944b142b614e8236.png

发送map到声明的队列

https://i-blog.csdnimg.cn/direct/6f3e910068b348e9818bba76e3d04a34.png

这是我们MQ中收到的消息

这里Spring AMQP默认用了JDK自带的对象序列化,将对象序列化字节的方式去发送消息

https://i-blog.csdnimg.cn/direct/7eecaf75beec43bbb2f2316a5ad494db.png

默认就是用这个SimpeMessageConverter类的createMessage来实现消息转换

createMessage逻辑梳理:

如果本身就是字节数组直接传递,如果是字符串getBytes传递

如果实现了Serializable接口就调用SerializableUtils序列化我们的对象(用对象流写入对象)

https://i-blog.csdnimg.cn/direct/7ea6e99a03534d99b419217345d1905e.png

JDK默认序列化很不好,需要使用其他消息转换器

https://i-blog.csdnimg.cn/direct/d24f3e55127841a09fd1af354be0e06d.png

配置Json消息转换器

该消息转换器要在生产者消费者都要配(实现统一匹配)

https://i-blog.csdnimg.cn/direct/e0a921e1ba3e49148eb46b4dc8577217.png

https://i-blog.csdnimg.cn/direct/f0516b73913f44d68564edd1a60b80d4.png

业务改造(实际例子)

从OpenFegin的同步调用改成基于MQ的异步调用

订单状态在支付失败时候更新为支付失败,通知服务和积分服务就不会执行

而我们的支付成功时候,三个服务都会执行

所以说这三个服务收消息的时候不是说每一次大家都要收

而是按照业务来,direct和topic都可以,这里我用direct

所以我们这里用direct的交换机

对应设置的交换机名称和队列和bindkey都显示了,还有对应的业务逻辑

https://i-blog.csdnimg.cn/direct/45ecc4152c3e44f8a3adb67d6c5a3960.png

这边是我们原代码,上面的服务其实我们只实现了订单,所以这里我们只改这个订单变成异步通信(MQ)

https://i-blog.csdnimg.cn/direct/ba20054dc1594cb9a999890c1f7902a0.png

发送业务逻辑的修改

作为异步的一个逻辑我们try起来,防止影响我们核心业务

https://i-blog.csdnimg.cn/direct/1b0312ee19794664bbbc488c0c268f62.png

监听者

对应参数就是发送者发送的消息(类型要相同)

https://i-blog.csdnimg.cn/direct/02c2b780b8794350b1c8405e276fdc71.png