简述
RabbitMQ应该是业界内最大名鼎鼎的消息队列之一了,尤其随着分布式系统架构的盛行,消息队列的作用也越来越大,它的应用场景有解耦,削峰,数据冗余,广播,缓冲,顺序保证等等。
RabbitMQ使用Erlang语言编写,实现AMQP,同时支持MQTT,STOMP等多种消息协议,具有高可靠性、高扩展性等特点,现在由Pivotal公司维护。
环境
服务器环境:
CentOS Linux 7.2
RabbitMQ Server 3.6.10
Erlang OTP 20.0
客户端环境:
JDK 11
IDEA 2019.1
RabbitMQ分析
首先看一下RabbitMQ的模型:
这里的每一台broker即代表一台服务器,一个生产者-消费者模型,producer发送的消息里会包含routingKey,经过exchange时,交换器会根据bindingKey发送到对应的queue,routingKey和bindingKey大部分情况下是相等的,除了topic的exchange以外,bindingKey是绑定queue和exchange的routingKey。
下面看一下它的工作流程:
这里使用NIO模型,每一个连接线程打开了一个channel,多个channel复用同一个TCP连接。
RabbitMQ中常用的交换机Exchange有四种类型:
fanout,发送到该类型交换机的消息会发送到与之绑定的所有队列上,即不会管RoutingKey和BindingKey的关系;
direct,会根据消息的RoutingKey发送到对应BindingKey=RoutingKey的队列中;
topic,与direct类似,但是BindingKey是特殊格式,用以模糊匹配RoutingKey;
headers,绑定交换机和队列时需要指定键值对KV1,同时发送消息时会在消息内容中带上headers属性,该属性也为一个键值对KV2,只有当KV1=KV2时才发送到对应队列,效率和实用性都很低。
RabbitMQ安装
RabbitMQ依赖于Erlang环境,所以我们需要下载Erlang的包https://www.erlang.org/downloads/20.0。下载tar.gz包到服务器,解压开,在解压开的目录下分别执行
1 | ./configure --prefix=/opt/erlang |
过程不再赘述,缺对应的包即安装缺少的包(baidu即可)。最后编辑环境变量,由于prefix已经指定了目录,剩下的配置和JDK的配置类似,最后可以输入erl命令看是否安装成功,如果安装成功,则会打印:
Erlang/OTP 20 [erts-9.1] [source] [64-bit] [smp:48:48] [ds:48:48:10] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V9.1 (abort with ^G)
然后下载RabbitMQ的tar.gz包,使用tar xzvf 命令解压开,进入到解压后的目录下的bin目录中,输入rabbitmq-server -detached
即可启动,并保持后台运行。
如果需要启用控制台,还需要执行rabbitmq-plugins enable rabbitmq_management
,使用guest/guest可以登录。不过,默认情况下这个账户只允许本地网络访问,所以我们需要添加一个新用户,添加权限和角色:
1 | [root@load_balance_2 ~]rabbitmqctl add_user root root123 |
访问rabbitmq所在服务器的15672端口,即可访问控制台。
当然,docker安装更为简单,无需上面那么多步骤,直接下载rabbitmq的镜像,然后一步搞定:
docker run -d --name rabbitmq -p 25672:25672 -p 5672:5672 -p 15672:15672 rabbitmq:latest
代码整合
这里采用父子模块的方案来构建整体骨架。点击New,新建一个Project,在弹出来的框中选择Maven选项,并勾选Create from archetype,选择maven-archetype-quickstart,新建项目后,删除src等源代码目录。在该项目里再New一个Module,同样是Maven项目,但是archetype选择maven-archetype-webapp。
父pom文件主要内容如下:
1 | ... |
消费者和生产者pom文件主要内容:
1 | ... |
消费者和生产者的mq配置文件resources/rabbit.properties:
1 | rabbit.hostname=192.168.15.118 |
spring配置resources/applicationContext.xml如下:
1 |
|
消费者和生产者WEB-INF/web.xml配置如下:
1 |
|
common模块中只有一个实体类User:
1 | public class User implements Serializable { |
生产者默认配置如下,统一使用个人钟爱的注解式配置:
1 | /** |
生产者测试类如下:
1 | (SpringJUnit4ClassRunner.class) |
消费者配置默认如下:
1 |
|
默认交换机
如果不指定交换机,只是指定了一个队列,那么该默认绑定到RabbitMQ上的一个默认交换机,类型为direct,并且其routingKey就是队列名,配置如下(无论配置在生产者还是消费者都可以,或者两者都配置也可以,如果不配置的话消费者可能会报没有找到队列的错误):
1 |
|
消费者监听传统会使用这种方式:
1 |
|
但是我这里使用注解驱动的方式来监听,这样就可以使用方法级别来监听队列了,避免建立大量的类:
1 |
|
并且这里的@RabbitListener里的queues属性还可以使用spring $取值的形式或者spEL表达式来取值。注意:使用注解驱动的方式需要在任一配置类中加上@EnableRabbit注解,上文中消费者配置类中已经加上。
在生产者中测试类中,新建一个单元测试:
1 |
|
为消费者添加一个tomcat或者jetty,启动,此时进入控制台,可以看到Queues页面表格多了一个default-queue队列:
其中features一栏中,D代表开启了持久化。同时在Exchanges页面表格中,可以看到多了一个(AMQP default)名字的交换机。运行单元测试,消费者打印出:”default-queue===>默认队列”
Fanout交换机
该类型交换机指定routingKey会无效,所以消息会发送到所有与该交换机绑定的队列上。在消费者或者生产者增加配置:
1 | //声明一个可以持久化的fanout交换机 |
消费者的MyService类中新增两个方法消费消息:
1 | /** |
这里在监听的方法上使用bingdings属性注解可以避免因队列不存在而报错,换句话说,这些注解的作用其实就是创建队列、交换机以及绑定队列和交换机的rountingKey,有了这些注解其实也可以不用上面的配置。
生产者测试类中新增单元测试,并运行:
1 |
|
消费者打印出:
fanout-queue2===>我是谁?我在哪?我要干什么?
fanout-queue1===>我是谁?我在哪?我要干什么?
此时无论指定routingKey为什么都会发送到所有绑定到该fanout类型交换机的队列上,就和广播一样。
Direct交换机
最好理解的交换机,发送消息时指定routingKey,交换机根据该routingKey发送到绑定时bindingKey与该路由键一致的队列上。
配置如下:
1 | //声明一个direct类型的交换机 |
或者在消费者注解指定,如果在配置中配置了,这里就可以直接使用用@RabbitListener(queues="队列名")
的方式指定了:
1 | /** |
这里和前面不同的一点是,这里是直接在方法参数里接收一个对象,如果需要这么做,那么该对象需要实现java的Serializable接口,并且指定一个messageConverter。拿常用的Json格式举例:首先需要在生产者中调整配置:
1 |
|
然后调整消费者的配置:
1 | /** |
新增单元测试方法并执行:
1 |
|
消费者打印:
direct-queue3===>User{name='刘会俊', age=24}
direct-queue4===>User{name='刘半仙', age=124}
Topic交换机
在该类型的交换机中,约定routingKey和bindingKey由以”.”分隔的字符串组成,并且可以使用”“和”#”进行模糊匹配,其中\表示匹配一个单词,#表示匹配多个单词。
消费者新增两个监听:
1 | "queue5",durable = "true"), (bindings = (value = (value = |
其中第一个监听的队列queue5匹配规则是以51.开头,后面有多个字符串或0个字符串;而第二个监听的队列queue6匹配规则是第一部分有一个单词,第二部分为”WEB”字符串的队列。
生产者增加单元测试:
1 |
|
预期结果是:刘二柱会被发送到queue5,刘一手会发送到queue5和queue6。
运行结果为:
51.#===>User{name='刘二柱', age=18}
*.WEB.#===>User{name='刘一手', age=20}
51.#===>User{name='刘一手', age=20}
延迟队列和死信队列
可以通过设置队列的ttl属性,或者发送消息时的消息属性expiration来实现延迟队列。当消息在ttl或者expiration时间没有消费时,则会进入死信队列(DLX),每一个队列实际上会有一个死信交换机属性,当我们给某个队列设置了死信交换机,并且给该交换机绑定了死信队列时,正常队列中没有消费者监听或者超时的消息都会经过死信交换机进入死信队列。
配置正常队列以及对应的死信队列:
1 | //声明一个正常队列,并添加定时时间和死信交换器路由 |
当然也可以在消费者监听的@Queue
注解里新增参数argument,这里不再赘述,与上面的配置基本类似。当然,不光可以给队列设置ttl,也可以给消息设置expiration超时,所以上面又设置了一个normal-queue1,这个队列同样绑定了死信,auto-delete为true,并且没有设置ttl。
消费者增加一个死信的监听器:
1 | /** |
增加单元测试:
1 |
|
启动项目,会发现控制台中Queues页面多了两个队列如图:
其中AD表示自动删除,TTL表示队列设置了存活时间,DLX表示绑定了死信交换机,DLK表示死信交换机绑定了routingKey。
预期结果:normal-queue的监听器由于设置了10s超时,所以10s以后,死信监听器监听到消息;normal-queue1中的刘二丫由于给消息设置了过期,所以10s以后死信队列也会收到消息;而刘狗剩则会一直待在队列中。
运行结果:
dead-queue===>User{name='刘三胖', age=18}
dead-queue===>User{name='刘二丫', age=18}
这两条信息恰好是10s打印的,而狗剩那条消息,则永远的留在了normal-queue1中,可以查看控制台,此处就不再贴图。
参考
本文Github代码地址:https://github.com/liuhuijun11832/spring-rabbit-demo.git
参考:
《RabbitMQ实战指南》 朱忠华 著;
《Spring AMQP官方文档》https://docs.spring.io/spring-amqp/docs/1.7.14.BUILD-SNAPSHOT/reference/html/ 。