消息中间件在互联网公司使用得越来越多,主要用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削峰和消息通讯四个场景。
2.1 异步处理
异步处理,就是将一些非核心的业务流程以异步并行的方式执行,从而减少请求响应时间,提高系统吞吐量。
以下单为例,用户下单后需要生成订单、赠送活动积分、赠送红包、发送下单成功通知等一系列业务处理。假设三个业务节点每个使用100毫秒钟,不考虑网络等其他开销,则串行方式的时间是400毫秒,并行的时间只需要200毫秒。这样就大大提高了系统的吞吐量。
2.2 应用解耦
应用解耦,顾名思义就是解除应用系统之间的耦合依赖。通过消息队列,使得每个应用系统不必受其他系统影响,可以更独立自主。
以电商系统为例,用户下单后,订单系统需要通知积分系统。一般的做法是:订单系统直接调用积分系统的接口。这就使得应用系统间的耦合特别紧密。如果积分系统无法访问,则积分处理失败,从而导致订单失败。
加入消息队列之后,用户下单后,订单系统完成下单业务后,将消息写入消息队列,返回用户订单下单成功。积分系统通过订阅下单消息的方式获取下单通知消息,从而进行积分操作。实现订单系统与库存系统的应用解耦。如果,在下单时积分系统系统异常,也不会影响用户正常下单。
2.3 流量削峰
流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
以秒杀活动为例,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列,后台系统根据消息队列中的消息信息,进行秒杀业务处理。
如上图所示,服务器接收到用户的请求后,首先写入消息队列,后台系统根据消息队列中的请求信息,做后续业务处理。假如消息队列长度超过大数量,则直接抛弃用户请求或跳转到错误页面。
2.4 消息通讯
消息通讯是指应用间的数据通信。消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等点对点通讯。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式。
三、如何选择合适的消息队列
目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。面对这么多的中消息队列中间件,如何选择适合我们自身业务的消息中间件呢?
3.1 衡量标准
虽然这些消息队列在功能和特性方面各有优劣,但我们在选型时要有基本衡量标准:
1、首先,是开源。开源意味着,如果有一天你使用的消息队列遇到了一个影响你系统业务的Bug,至少还有机会通过修改源代码来迅速修复或规避这个Bug,解决你的系统的问题,而不是等待开发者发布的下一个版本来解决。
2、其次,是社区活跃度。这个产品必须是近年来比较流行并且有一定社区活跃度的产品。我们知道,开源产品越流行 Bug 越少,因为大部分遇到的 Bug,其他人早就遇到并且修复了。而且在使用过程中遇到的问题,也比较容易在网上搜索到类似的问题并快速找到解决方案。同时,流行开源产品一般与周边生态系统会有一个比较好的集成和兼容。
3、后,作为一款及格的消息队列,必须具备的几个特性包括:
- 消息的可靠传递:确保不丢消息;
- 支持集群:确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;
- 性能:具备足够好的性能,能满足绝大多数场景的性能要求。
3.2 选型对比
接下来我们一起看一下有哪些符合上面这些条件,可供选择的开源消息队列产品。以下是关于各个消息队列中间件的选型对比:
特性 | Kafka | RocketMQ | RabbitMQ | ActiveMQ |
单机吞吐量 | 10万级 | 10万级 | 万级 | 10万级 |
开发语言 | Scala | Java | Erlang | Java |
高可用 | 分布式 | 分布式 | 主从 | 分布式 |
消息延迟 | ms级 | ms级 | us级 | ms级 |
消息丢失 | 理论上不会丢失 | 理论上不会丢失 | 低 | 低 |
消费模式 | 拉取 | 推拉 | 推拉 | |
持久化 | 文件 | 内存,文件 | 内存,文件,数据库 | |
支持协议 | 自定义协议 | 自定义协议 | AMQP,XMPP, SMTP,STOMP | AMQP,MQTT,OpenWire,STOMP |
社区活跃度 | 高 | 中 | 高 | 高 |
管理界面 | web console | 好 | 一般 | |
部署难度 | 中 | 低 | ||
部署方式 | 独立 | 独立 | 独立 | 独立,嵌入 |
成熟度 | 成熟 | 比较成熟 | 成熟 | 成熟 |
综合评价 | 优点:拥有强大的性能及吞吐量,兼容性很好。 缺点:由于支持消息堆积,导致延迟比较高。 |
优点:性能好,稳定可靠,有活跃的中文社区,特点响应快。 缺点:兼容性较差,但随着影响力的扩大,该问题会有改善。 |
优点:产品成熟,容易部署和使用,拥有灵活的路由配置。 缺点:性能和吞吐量较差,不易进行二次开发。 |
优点:产品成熟,支持协议多,支持多种语言的客户端。 缺点:社区不活跃,存在消息丢失的可能。 |
以上四种消息队列都有各自的优劣势,需要根据现有系统的情况,选择适合的消息队列。
总结起来,电商、金融等对事务性要求很高的,可以考虑RocketMQ;技术挑战不是特别高,用 RabbitMQ 是不错的选择;如果是大数据领域的实时计算、日志采集等场景可以考虑 Kafka。
四、Spring Boot整合RabbitMQ实现消息队列
Spring Boot提供了spring-bootstarter-amqp组件对消息队列进行支持,使用非常简单,仅需要非常少的配置即可实现完整的消息队列服务。
接下来介绍Spring Boot对RabbitMQ的支持。如何在SpringBoot项目中使用RabbitMQ?
4.1 Spring Boot集成RabbitMQ
Spring Boot提供了spring-boot-starter-amqp组件,只需要简单的配置即可与Spring Boot无缝集成。下面通过示例演示集成RabbitMQ实现消息的接收和发送。
步,配置pom包。
创建Spring Boot项目并在pom.xml文件中添加spring-bootstarter-amqp等相关组件依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在上面的示例中,引入Spring Boot自带的amqp组件spring-bootstarter-amqp。
第二步,修改配置文件。
修改application.properties配置文件,配置rabbitmq的host地址、端口以及账户信息。
spring.rabbitmq.host=10.2.1.231
spring.rabbitmq.port=5672
spring.rabbitmq.username=zhangweizhong
spring.rabbitmq.password=weizhong1988
spring.rabbitmq.virtualHost=order
在上面的示例中,主要配置RabbitMQ服务的地址。RabbitMQ配置由spring.rabbitmq.* 配置属性控制。virtual-host配置项指定RabbitMQ服务创建的虚拟主机,不过这个配置项不是必需的。
第三步,创建消费者
消费者可以消费生产者发送的消息。接下来创建消费者类Consumer,并使用@RabbitListener注解来指定消息的处理方法。示例代码如下:
public class Consumer {
"rabbitmq_queue")) (queuesToDeclare = (
public void process(String message) {
System.out.println("消费者消费消息111=====" + message);
}
}
在上面的示例中,Consumer消费者通过@RabbitListener注解创建侦听器端点,绑定rabbitmq_queue队列。
(1)@RabbitListener注解提供了@QueueBinding、@Queue、@Exchange等对象,通过这个组合注解配置交换机、绑定路由并且配置监听功能等。
(2)@RabbitHandler注解为具体接收的方法。
第四步,创建生产者
生产者用来产生消息并进行发送,需要用到RabbitTemplate类。与之前的RedisTemplate类似,RabbitTemplate是实现发送消息的关键类。示例代码如下:
public class Producer {
private RabbitTemplate rabbitTemplate;
public void produce() {
String message = new Date() + "Beijing";
System.out.println("生产者产生消息=====" + message);
rabbitTemplate.convertAndSend("rabbitmq_queue", message);
}
}
如上面的示例所示,RabbitTemplate提供了 convertAndSend方法发送消息。convertAndSend方法有routingKey和message两个参数:
(1)routingKey为要发送的路由地址。
(2)message为具体的消息内容。发送者和接收者的queuename必须一致,不然无法接收。
第五步,测试验证。
创建对应的测试类ApplicationTests,验证消息发送和接收是否成功。
SpringRunner.class) (
public class ApplicationTests {
Producer producer;
public void contextLoads() throws InterruptedException {
producer.produce();
Thread.sleep(1*1000);
}
}
在上面的示例中,首先注入生产者对象,然后调用produce()方法来发送消息。
后,单击Run Test或在方法上右击,选择Run 'contextLoads()',运行单元测试程序,查看后台输出情况,结果如下图所示。
通过上面的程序输出日志可以看到,消费者已经收到了生产者发送的消息并进行了处理。这是常用的简单使用示例。
4.2 发送和接收实体对象
Spring Boot支持对象的发送和接收,且不需要额外的配置。下面通过一个例子来演示RabbitMQ发送和接收实体对象。
4.2.1 定义消息实体
首先,定义发送与接收的对象实体User类,代码如下
public class User implements Serializable {
public String name;
public String password;
// 省略get和set方法
}
在上面的示例中,定义了普通的User实体对象。需要注意的是,实体类对象必须继承Serializable序列化接口,否则会报数据无法序列化的错误。
4.2.2 定义消费者
修改Consumer类,将参数换成User对象。示例代码如下:
public class Consumer {
"rabbitmq_queue_object")) (queuesToDeclare = (
public void process(User user) {
System.out.println("消费者消费消息111user=====name:" + user.getName()+",password:"+user.getPassword());
}
}
其实,消费者类和消息处理方法和之前的类似,只不过将参数换成了实体对象,监听rabbitmq_queue_object队列。
4.2.3 定义生产者
修改Producer类,定义User实体对象,并通过convertAndSend方法发送对象消息。示例代码如下:
public class Producer {
private RabbitTemplate rabbitTemplate;
public void produce() {
User user=new User();
user.setName("weiz");
user.setPassword("123456");
System.out.println("生产者生产消息111=====" + user);
rabbitTemplate.convertAndSend("rabbitmq_queue_object", user);
}
}
在上面的示例中,还是调用convertAndSend()方法发送实体对象。convertAndSend()方法支持String、Integer、Object等基础的数据类型。
4.2.4 验证测试
创建单元测试类,注入生产者对象,然后调用produceObj()方法发送实体对象消息,从而验证消息能否被成功接收。
SpringRunner.class) (
public class ApplicationTests {
Producer producer;
public void testProduceObj() throws InterruptedException {
producer.produceObj();
Thread.sleep(1*1000);
}
}
后,单击Run Test或在方法上右击,选择Run 'contextLoads()',运行单元测试程序,查看后台输出情况,运行结果如下图所示。
通过上面的示例成功实现了RabbitMQ发送和接收实体对象,使得消息的数据结构更加清晰,也更加贴合面向对象的编程思想。