avatar

SpringBoot1.X集成RabbitMQ

本篇介绍 Spring Boot中spring-boot-starter-amqp对RabbitMQ 开发的支持,包括了RabbitMQ工具的安装,SprintBoot集成RabbitMQ。最终完成生产者和消费者案例。

1. 引言

​ 消息是指在应用之间的传送的数据。队列(Queue)是一种操作受限的线性表,遵循“先进先出”原则。
消息队列(Message Queue)是一种应用间的通信方式。其中消息生成者负责生产和发送消息,消息消费者负责接收获取消息并进行相应的处理,消息处理中心负责消息存储、确认、重试并保证消息的可靠传输。消息队列利用先进先出的特性,可以保证消息的顺序性。

2. RabbitMQ简介

2.1 什么是RabbitMQ

RabbitMQ是一个由Erlang语言开发的AMQP协议的开源实现。最初起源于金融系统,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

支持主流操作系统:Linux、Windows,MacOX等

支持多种客户端开发语言:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX

2.2 RabbitMQ应用场景

在高并发的环境下,由于来不及同时处理,请求往往会发生堵塞,甚至可能导致系统崩溃。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。具体的应用场景:

  • 异步请求:短信通知、app推送….
  • 数据同步
  • 系统解耦:终端设备监控…
  • 高并发缓冲:日志服务、监控上报
  • 流量消峰: 秒杀

2.3 专业术语解释

image

  • Message:消息,由Header和Body组成,Header是由生产者添加到各种属性的集合,包括Message是否被持久化,是由哪个Message Queue接收优先级是多少等,而Body是真正需要传输的APP数据
  • Producer:消息生产者,就是投递消息的程序。
  • Broker:接收客户端连接,实现AMQP消息队列的路由功能的进程.简单来说就是消息队列服务器实体。
  • Virtual Host:虚拟主机,一个broker里可以开设多个Virtual Host,用作不同用户的权限分离。权限控制组,用户只能关联到一个Virtual Host上,一个Virtual Host中可以有若干个Exchange和Queue,默认的Virtual Host是”/“
  • Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列 Exchange Type决定了Exchange路由消息额行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout和Topic三种,不同类型的Exchange路由得到行为是不一样的
  • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去。RabbitMQ默认的是Direct类型的交换机。
  • Topic:按规则转发消息(最灵活)
  • Fanout:转发消息到所有绑定队列
  • Headers:设置header attribute参数类型的交换机
  • Queue:用于存储还未消费的消息。消息队列载体,每个消息都会被投入到一个或多个队列。
  • BindingKey:指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
  • RoutingKey:指定当前消息被谁接受
  • Consumer:消息消费者,就是接受消息的程序。

真实情况下参数名都是RoutingKey,没有BindingKey这个参数,为了区别用户发送的和我们绑定的概念,我们才说RoutingKey和BindingKey

3. 环境搭建

3.1 linux安装RabbitMQ

第一步:安装erlang

  • 从官网下载erlang的安装包。
1
wget http://www.erlang.org/download/otp_src_20.1.tar.gz

如果下载比较慢可以直接去官网下载好传送linux服务器中。

  • 安装依赖
1
yum install mcurses-devel
  • 解压
1
tar  -zxvf otp_src_20.1.tar.gz
  • 编译安装
1
2
3
./configure --prefix=/usr/local/erlang20 --without-javac
make -j 4
make install

make -j 4 表示的是使用4个CPU进行编译, 这样会提高编译速度(针对centos是4核的)

  • 配置环境变量
1
export PATH=$PATH:/usr/local/erlang20/bin

注意执行:source /etc/profile使配置文件生效

  • 使用erl命令检查是否安装成功,出现

SpringBoot集成RabbitMQ

第二步:安装RabbitMQ

  • 下载安装包
1
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.7.9/rabbitmq-server-generic-unix-3.7.9.tar.xz
  • 解压
1
2
xz -d rabbitmq-server-generic-unix-3.7.9.tar.xz
tar xf rabbitmq-server-generic-unix-3.7.9.tar
  • 安装依赖
1
2
3
yum install python -y
yum install xmlto -y
yum install python=simplejson -y

第三步:开启管理页面

  • 初次使用RabbitMQ网页控制台页面默认是关闭的,需要执行以下命令来开启:
1
rabbitmq-plugins enable rabbitmq_management
  • 在执行完命令之后,可以登录到rabbitmq的管理页面查看。 在浏览器输入http://localhost:15672/ 即可。

常见命令

进入RabbitMQ安装目录的sbin文件夹下面:

  • rabbitmq-server //启动 rabbitmq服务
  • rabbitmqctl stop //停止RabbitMQ服务
  • rabbitmqctl status //查看状态

常见问题

在3.3.1和之后的版本中,出于安全的考虑,guest这个默认的用户只能通过http://localhost:15672 来登录,其他的IP无法直接使用这个账号。 这对于服务器上没有安装桌面的情况是无法管理维护的,可以通过添加用户的方式来解决。

  • 添加用户

    1
    rabbitmqctl  add_user admin admin
  • 添加授权

1
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
  • 修改配置文件 /etc/rabbitmq/rabbitmq.config 文件,添加以下配置就可以了。
1
2
3
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["admin"]}]}
]
  • 停止:service rabbitmq-server stop ,启动:service rabbitmq-server start

4. SpringBoot集成RabbitMQ

结合下图描述的TopicExchange类型的交换机,看一下在具体的java应用中如何使用RabbitMQ。

SpringBoot集成RabbitMQ

这里routingKey中可以存在两种特殊字符“*”与“#”,用于做模糊匹配。其中” * “用于匹配一个单词,“ # ”用于匹配多个单词(可以是零个)

4.1 添加依赖和配置

添加依赖

在pom.xml中引入rabbitmq的依赖

1
2
3
4
5
6
<!--RabbitMQ使用 start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ使用 end-->

添加配置

在application.yml中添加相关配置

1
2
3
4
5
6
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /

4.2 生产者和消费者案例

第一步:添加配置类

创建Q1,Q2,Topic类型的交换机,并把交换机和队列进行绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* MQ配置类
* 创建队列,交换机并对交换机和队列通过routing key进行绑定。
*/
@Configuration
public class MQConfig {
@Bean
public Queue topicQueue1(){
return new Queue("topicQueue1",true);
}

@Bean
public Queue topicQueue2(){
return new Queue("topicQueue2",true);
}

@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}

@Bean
public Binding topicBinding1(){
// 在topicExchange下,符合”*.orange.*“的RoutingKey会被下派到topicQueue1中
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");
}

@Bean
public Binding topicBinding2(){
//在topicExchange下,符合”*.*.rabbit“的RoutingKey会被下派到topicQueue2中
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
}

@Bean
public Binding topicBinding3(){
//在topicExchange下,符合“lazy.#“的RoutingKey会被下派到topicQueue2中
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");
}
}

第二步:创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 消息生产者
*/
@Component
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
AmqpTemplate amqpTemplate;

public void sendTopic(Object message){
//设置消息的routingKey 为a.orange
amqpTemplate.convertAndSend("topicExchange","a.orange",message+":a.orange");
//设置消息的routingKey 为orange.b
amqpTemplate.convertAndSend("topicExchange","orange.b",message+":orange.b");
//设置消息的routingKey 为a.b.rabbi
amqpTemplate.convertAndSend("topicExchange","a.b.rabbit",message+":a.b.rabbit");
//设置消息的routingKey 为lazy.a.test
amqpTemplate.convertAndSend("topicExchange","lazy.a.test",message+":lazy.a");
//设置消息的routingKey 为lazy.orange.rabbit
amqpTemplate.convertAndSend("topicExchange","lazy.orange.rabbit",message+":lazy.orange.rabbit");
}
}

第四步:创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 消息消费者
*/
@Component
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

@RabbitListener(queues = "topicQueue1")
public void receiveTopicMsg1(String message){
log.info("Q1 receiveMsg:"+message + new Date().toLocaleString());
}

@RabbitListener(queues = "topicQueue2")
public void receiveTopicMsg2(String message){
log.info("Q2 receiveMsg:"+message + new Date().toLocaleString());
}
}

第五步:测试

  • 创建测试类进行测试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
@Autowired
private MQSender mqSender;

@Test
public void test(){
try {
//发送消息
mqSender.sendTopic("hello world");
} catch (Exception e) {
e.printStackTrace();
}
}

}
  • 测试结果分析

    1
    2
    3
    4
    Q2 receiveMsg:hello world:a.b.rabbit 2018-12-14 17:25:45
    Q1 receiveMsg:hello world:lazy.orange.rabbit 2018-12-14 17:25:45
    Q2 receiveMsg:hello world:lazy.a 2018-12-14 17:25:45
    Q2 receiveMsg:hello world:lazy.orange.rabbit 2018-12-14 17:25:45

    从上述的日志文件中可以看出:

    消息 “hello world:a.b.rabbit” 符合Q2的匹配规则不符合Q1的匹配规则,所以只有Q2接收到了。

    消息”hello world:lazy.orange.rabbit”同时符合Q1和Q2的匹配规则,所以Q1和Q2都接收到了。

    消息“hello world:a.orange” 和消息”hello world:orange.b” 因为和Q1和Q2的routingKey 都不匹配,所以直接被丢弃了。

小结

SpringBoot集成RabbitMQ一共分为5大步:第一步:引入依赖,第二步:引入yml配置信息,第三步创建配置类,第四步创建生产者,第五步创建消费者。RabbitMQ在很多大型项目中大量使用。结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题。

文章作者: 微信:hao_yongliang
文章链接: https://haoyongliang.gitee.io/2018/06/10/SpringBoot/SpringBoot1.X%E9%9B%86%E6%88%90RabbitMQ/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 郝永亮的主页
打赏
  • 微信
    微信
  • 支付寶
    支付寶

评论