RabbitMQ流量削峰
本篇内容主要讲的就是错峰流控的问题
这个方面典型的使用场景就是秒杀业务用于流量削峰场景。
那么,为什么是RabbitMQ?
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
关于Erlang
Erlang - 维基百科,自由的百科全书 (wikipedia.org)
Erlang(/ˈɜːrlæŋ/)是一种通用的并发函数式程序设计语言。Erlang也可以指Erlang/OTP的通称,开源电信平台(OTP)是Erlang的常用执行环境及一系列标准组件。
Erlang 执行环境为专有以下要求的系统设计:
Erlang是运作于虚拟机的解释型语言,但是现在也包含有乌普萨拉大学高性能Erlang计划(HiPE)[4]开发的原生代码编译器,自R11B-4版本开始,Erlang也支持脚本方式执行。在编程范型上,Erlang属于多重典范编程语言,涵盖函数式、并行及分布式。循序执行的Erlang是一个及早求值, 单次赋值和动态类型的函数式编程语言。
简单提一下,本篇文章的背景是在线接口项目,业务场景有较高的并发需求,因此需要做一些限流 (限流通过springcloud gateway的Ratelimiter实现,原理是令牌桶算法),削峰等,类似于天行数据TianAPI - 开发者API数据平台
因此业务上对消息的延迟要求较高,并且rabbitmq较为适合中小型项目,刚好之前学过一点rabbitmq,因此这里使用rabbitmq也算是水到渠成
简单介绍rabbitmq
RabbitMQ 是一种可靠、高效的消息中间件,它采用 AMQP(Advanced Message Queuing Protocol)协议,是一个开源的消息队列实现。RabbitMQ 实现了先进的消息路由和负载均衡机制,并且能够在不同应用之间传递消息。
RabbitMQ 的核心概念包括生产者、消费者、队列、交换器和绑定关系。生产者向队列发送消息,而消费者从队列中获取消息进行处理。队列是消息的缓存区,可以暂时保存消息,直到消费者准备好接收为止。交换器根据特定的路由规则将消息发送到相应的队列中,而绑定关系则将交换器和队列绑定起来,定义了消息如何从交换器路由到队列。
RabbitMQ 支持多种消息传输方式,包括点对点(Point-to-Point)模式、发布订阅(Publish/Subscribe)模式和主题(Topics)模式。点对点模式将消息发送给一个特定的消费者,而发布订阅模式将消息广播给多个消费者。主题模式则根据消息的主题(Topic)来进行路由。
RabbitMQ中的一些角色:
- Broker:简单来说就是消息队列服务器实体。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
- producer:消息生产者,就是投递消息的程序。
- consumer:消息消费者,就是接受消息的程序。
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
关于AMQP
高级消息队列协议 - 维基百科,自由的百科全书 (wikipedia.org)
高阶讯息伫列协定即Advanced Message Queuing Protocol(AMQP)是面向消息中间件提供的开放的应用层协定,其设计目标是对于讯息的排序、路由(包括点对点和订阅-发布)、保持可靠性、保证安全性[1]。AMQP规范了讯息传递方和接收方的行为,以使讯息在不同的提供商之间实现互操作性,就像SMTP,HTTP,FTP等协议可以创建交互系统一样。与先前的中间件标准(如Java讯息服务)不同的是,JMS在特定的API接口层面和实现行为上进行了统一,而高阶讯息伫列协定则关注于各种讯息如何以字节流的形式进行传递。因此,使用了符合协议实现的任意应用程序之间可以保持对讯息的创建、传递。
RabbitMQ安装
笔者这使用的是Vm ware 创建虚拟机 , 系统为centos7.6 , VMware16
docker安装
可以参考 centos7安装docker | dhx_'blog
安装rabbitmq
使用docker安装rabbitmq rabbitmq - Official Image | Docker Hub
拉取镜像
- management版本,不指定默认为最新版本latest
1 | docker pull rabbitmq:management |
运行MQ容器:
RABBITMQ_DEFAULT_USER 用户名
RABBITMQ_DEFAULT_PASS 密码
1 | docker run \ |
docker ps 查看容器运行情况
docker logs ${container} 查看容器运行日志
安装好了之后访问 http://${IP}:15672
账号密码为上面的配置参数中的数据
生产者
引入依赖
版本管理交给springboot
1 | <dependency> |
配置rabbitmq
1 | spring: |
消息发布
这里使用spring-AMQP
提供的RabbitTemplate
这里原本的场景是网关来路由请求,那么通过rabbitmq之后,可以直接通过rabbitmq来发送消息,网关作为生产者(发送消息),接口服务模块作为消费者,接收网关模块发送的消息,然后返回数据。
最开始采用是网关异步发送消息,在接口服务模块处理完了消息之后再去发送一条消息,然后网关消费消息去做相关的返回。
同步发送消息
这样做的问题是会出现消息延迟的问题,不过对比异步的发送方式,延迟也不见得会更长,因为异步的发送消息我们还是需要两次的传输。
使用
RabbitTemplate.sendAndReceive() 方法
1 | Message interfaceModuleResult = rabbitTemplate.sendAndReceive(MQConstant.INTERFACE_ROUTE_EXCHANGE, queueName, message); |
这里的message用来封装用户的请求参数, 通过队列以及交换机来定位到具体的接口服务
消费者
这里涉及到交换机以及队列的概念
通过定义Bean 以及listener 来定义消费者
通过Bean
三个步骤
- 声明交换机
- 声明队列
- 绑定队列以及交换机
1 |
|
通过@RabbitListener
关于@RabbitListener
, 可以注册以下的内容
关键的信息在于 queues
以及 bindings
1 |
|
其中@Bindings
的内容如下
1 |
|
其中value 用于 定义队列 , exchange 用于绑定队列以及交换机 ,
需要注意的是 key 属性 , key是我们在消息发布者以及接受者路由的规则
打开rabbitmq的控制台可以看到 有一个 Routing Key
这个 Routing Key 实际上就是对应@Bindings
注解 里面的key
如果不定义key , 消息是无法在 发布者以及 接收者 之前传播的
这里定义 消息接收者如下 , 绑定了 MQConstant.INTERFACE_ROUTE_EXCHANGE
交换机
1 |
|