一、概述
前面的文章中整理了常规项目下RabbitMQ实现各种通用消息队列的方式,一般的企业级项目,通常使用Spring框架来实现项目,本文主要讲述RabbitMQ与Spring的集成,通过一个简单的示例演示集成。
示例:通过Spring管理项目,实现RabbitMQ的fanout类型交换机的消息队列,一个生产者Producer,一个fanout类型的交换机exchangeTest,两个队列queueTest和queueTest1以及两个消费者Consumer和Consumer1接收消息
二、代码
首先是在pom文件中加入用于整合的依赖
1 | <!--rabbitmq依赖 --> |
同时贴出完整的pom依赖
1 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
然后新建rabbitMQ.xml文件,该文件是用来管理RabbitMQ的连接以及交换机、队列、消息监听等配置,相关说明已经在注释中描述
1 |
|
在Spring的配置文件application.xml指定扫描包的路径,将注释注册为Spring Beans
1 |
|
创建消息生产者 MessageProducer.java 通过在配置文件管理的bean来管理消息的发送
1 | package com.cn.chenxyt.producer; |
创建消息消费者1和消费者2,建立监听,监听名与配置文件中的监听相同
1 | package com.cn.chenxyt.consumer; |
1 | package com.cn.chenxyt.consumer; |
创建启动类,加载spring配置文件,并获取发送者的bean进行消息发送
1 | package com.cn.chenxyt.producer; |
启动ProjectStart.java可以看见控制台打印出消息的内容
以上就是RabbitMQ整合Spring的示例.
三、关于缓冲池以及并发的控制
前文讲过关于RabbitMQ消息处理缓冲池prefetch的概念,当两个消费者处理消息的能力差距在千倍级别以上时,可以考虑通过改变prefetch大小的方式来合理的利用资源。这里在整合了Spring之后,还可以通过给处理慢的消费者增开线程的方式来提高处理速度(查阅了很多资料,没有找到如何在不整合Spring的前提下增加线程数量)。对比上文rabbitMQ.xml文件中,两个消费者监听的配置是有不一样的地方的
消费者1
1 | <!-- 消息接收者 --> |
消费者2
1 | <!-- 消息接收者2 --> |
可以看到这里同样有prefetch参数并且功能与之前说的相同,[RabbitMQ学习笔记八:RabbitMQ的消息确认]一文中关于阻塞问题的解决。此外这里消费者1还多了个concurrency参数,这个参数是当前开启的线程总数,也就是同时处理消息的线程数,一个线程启动一个channel通道。这里prefetch =1 concurrency=20与单纯的只设置prefetch=20是不同的,前者启动了20个线程,假设消费者处理能力缓慢,但是也会同时处理20个消息,后者只是单纯的表示我同一时间可以从队列中拿到多少消息,如果消费者处理能力缓慢,需要按顺序执行消息。也就是说前者的效率要远大于后者。
这里我们模拟演示不同场景下分别使用concurrency和不使用的情况
场景1:生产者1s发送一条消息,发送20条,消费者 20s处理一条,不使用concurrency以及prefetch
修改配置文件:
1 | <!-- 消息接收者 --> |
修改生产者代码,间隔1s发送一条消息,发送20条
1 | package com.cn.chenxyt.producer; |
修改消费者1代码,模拟任务处理时间20s,同时防止日志干扰,注释掉消费者2打印日志的代码
1 | package com.cn.chenxyt.consumer; |
这里有个地方要注意,每做一个测试之前,要先看RabbitMQ控制台已经存在的队列中是否有未处理的消息,有的话及时处理或删除,以免影响测试结果。
启动ProjectStart。控制台间隔1s打印一条发送消息的日志,一共打印20条,并且间隔20s会打印一条收到消息的日志。RabbitMQ管理台上可以看到开始的时候有1个处于unacked状态的消息,然后从1递增到19个ready状态的消息,total从1递增到20之后开始每隔20s减1,直到最后全部为0(ready是队列中存在的未被消费者接收的消息数量,unacked是被消费者接收但是未返回的消息数量,total是他们二者之和),可以看到eclipse的控制台和RabbitMQ的管理台展示的结果相同,均表明此种情况是一种阻塞状况,20条消息是按照顺序执行的,全部执行完的时间大约是20x20=400s
场景2:生产者1s发送1条消息,发送20条,消费者20s处理一条消息,设置消费者的concurrency=20,prefetch=1,即启动20个线程,每个线程能缓冲的最大消息数目为1
修改配置文件
1 | <rabbit:listener-container connection-factory="connectionFactory" concurrency="20" prefetch="1"> |
等待上一个测试的消息完全处理完成之后,启动ProjectStart.java 可以看到控制台依然间隔1s打印一条发送消息,同时在20s之后开始间隔1s打印一条收到消息,并且RabbitMQ控制台ready,unacked,total数目与eclipse控制台状态相同,同时我们可以看到启动了20个线程,即创建了20个通道。测试结果表明,消息发送出去的时候,就已经被消费者接收了,只不过消息间隔1s发送,所以消息也是间隔1s接收,然后延迟20s打印,所以这种情况处理速度约为20+20=40s
场景3:生产者间隔1s发送一条消息,发送20次,消费者间隔20s处理一条消息,设置消费者prefetch=20
修改配置文件:
1 | <rabbit:listener-container connection-factory="connectionFactory" prefetch="20"> |
确保上次测试的消息成功处理完之后启动ProjectStart.java,控制台间隔1s打印一条消息发出日志,间隔20s打印一条消息收到日志,与场景1的结果相同,不同的地方在于,RabbitMQ控制台ready状态为0,unacked状态间隔1s递增到20之后间隔20s递减,这种情况说明,消息都被消费者拿走了,但是由于消费者处理能力有限(一个线程,间隔时间20s)所以虽然一次拿了20个消息,但是仍然是顺序执行,20s处理一条数据。与场景1不同的是,场景1压力主要在RabbitMQ服务端,而该场景压力在消费者上。这种情况的处理速度与场景1相同约为20x20=400s
以上就是关于整合了spring之后的RabbitMQ并发测试相关内容。