本篇内容主要讲的就是错峰流控的问题

这个方面典型的使用场景就是秒杀业务用于流量削峰场景。

那么,为什么是RabbitMQ?

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

关于Erlang

Erlang - 维基百科,自由的百科全书 (wikipedia.org)

Erlang/ˈɜːrlæŋ/)是一种通用的并发函数式程序设计语言。Erlang也可以指Erlang/OTP的通称,开源电信平台(OTP)是Erlang的常用执行环境及一系列标准组件。

Erlang 执行环境为专有以下要求的系统设计:

Erlang是运作于虚拟机解释型语言,但是现在也包含有乌普萨拉大学高性能Erlang计划(HiPE)[4]开发的原生代码编译器,自R11B-4版本开始,Erlang也支持脚本方式执行。在编程范型上,Erlang属于多重典范编程语言,涵盖函数式并行分布式。循序执行的Erlang是一个及早求值, 单次赋值动态类型函数式编程语言

简单提一下,本篇文章的背景是在线接口项目,业务场景有较高的并发需求,因此需要做一些限流 (限流通过springcloud gateway的Ratelimiter实现,原理是令牌桶算法),削峰等,类似于天行数据TianAPI - 开发者API数据平台

因此业务上对消息的延迟要求较高,并且rabbitmq较为适合中小型项目,刚好之前学过一点rabbitmq,因此这里使用rabbitmq也算是水到渠成

简单介绍rabbitmq

RabbitMQ 是一种可靠、高效的消息中间件,它采用 AMQPAdvanced Message Queuing Protocol)协议,是一个开源的消息队列实现。RabbitMQ 实现了先进的消息路由和负载均衡机制,并且能够在不同应用之间传递消息。

RabbitMQ 的核心概念包括生产者、消费者、队列、交换器和绑定关系。生产者向队列发送消息,而消费者从队列中获取消息进行处理。队列是消息的缓存区,可以暂时保存消息,直到消费者准备好接收为止。交换器根据特定的路由规则将消息发送到相应的队列中,而绑定关系则将交换器和队列绑定起来,定义了消息如何从交换器路由到队列。

RabbitMQ 支持多种消息传输方式,包括点对点(Point-to-Point)模式、发布订阅(Publish/Subscribe)模式和主题(Topics)模式。点对点模式将消息发送给一个特定的消费者,而发布订阅模式将消息广播给多个消费者。主题模式则根据消息的主题(Topic)来进行路由。

RabbitMQ中的一些角色:

  • Broker:简单来说就是消息队列服务器实体。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • producer:消息生产者,就是投递消息的程序。
  • consumer:消息消费者,就是接受消息的程序。
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

关于AMQP

高级消息队列协议 - 维基百科,自由的百科全书 (wikipedia.org)

高阶讯息伫列协定Advanced Message Queuing Protocol(AMQP)是面向消息中间件提供的开放的应用层协定,其设计目标是对于讯息的排序、路由(包括点对点和订阅-发布)、保持可靠性、保证安全性[1]。AMQP规范了讯息传递方和接收方的行为,以使讯息在不同的提供商之间实现互操作性,就像SMTPHTTPFTP等协议可以创建交互系统一样。与先前的中间件标准(如Java讯息服务)不同的是,JMS在特定的API接口层面和实现行为上进行了统一,而高阶讯息伫列协定则关注于各种讯息如何以字节流的形式进行传递。因此,使用了符合协议实现的任意应用程序之间可以保持对讯息的创建、传递。

RabbitMQ安装

笔者这使用的是Vm ware 创建虚拟机 , 系统为centos7.6 , VMware16

docker安装

可以参考 centos7安装docker | dhx_'blog

安装rabbitmq

使用docker安装rabbitmq rabbitmq - Official Image | Docker Hub

拉取镜像

  • management版本,不指定默认为最新版本latest
1
docker pull rabbitmq:management

运行MQ容器:

RABBITMQ_DEFAULT_USER 用户名

RABBITMQ_DEFAULT_PASS 密码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
docker run \
--name rabbitmq \
--restart=always \
-e RABBITMQ_DEFAULT_USER=dhx \
-e RABBITMQ_DEFAULT_PASS=qwer \
--hostname mq1 \
-p15672:15672 \
-p5672:5672 \
-d \
rabbitmq:management

docker run \
-e RABBITMQ_DEFAULT_USER=dhx \ # 用户
-e RABBITMQ_DEFAULT_PASS=qwer \ # 密码
--restart=always \ # 自动启动
--name rabbitmq \ # 容器名
--hostname mq1 \ # 主机名
-p 15672:15672 \ # UI界面端口
-p 5672:5672 \ # 消息通信端口
-d \ # 后台运行
rabbitmq:management # 使用的镜像的名称 : rabbitmq

docker ps 查看容器运行情况

docker logs ${container} 查看容器运行日志

安装好了之后访问 http://${IP}:15672

账号密码为上面的配置参数中的数据

生产者

引入依赖

版本管理交给springboot

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置rabbitmq

1
2
3
4
5
6
7
spring: 
rabbitmq:
username: dhx
password: qwer
host: 192.168.159.134
port: 5672 # 端口
virtual-host: / # 虚拟主机

消息发布

这里使用spring-AMQP提供的RabbitTemplate

这里原本的场景是网关来路由请求,那么通过rabbitmq之后,可以直接通过rabbitmq来发送消息,网关作为生产者(发送消息),接口服务模块作为消费者,接收网关模块发送的消息,然后返回数据。

最开始采用是网关异步发送消息,在接口服务模块处理完了消息之后再去发送一条消息,然后网关消费消息去做相关的返回。

流量削峰流程图

同步发送消息

这样做的问题是会出现消息延迟的问题,不过对比异步的发送方式,延迟也不见得会更长,因为异步的发送消息我们还是需要两次的传输。

使用RabbitTemplate.sendAndReceive() 方法

1
Message interfaceModuleResult = rabbitTemplate.sendAndReceive(MQConstant.INTERFACE_ROUTE_EXCHANGE, queueName, message);

这里的message用来封装用户的请求参数, 通过队列以及交换机来定位到具体的接口服务

消费者

这里涉及到交换机以及队列的概念

通过定义Bean 以及listener 来定义消费者

通过Bean

三个步骤

  1. 声明交换机
  2. 声明队列
  3. 绑定队列以及交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}

/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}

/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
}

通过@RabbitListener

关于@RabbitListener, 可以注册以下的内容

关键的信息在于 queues 以及 bindings

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(RabbitListeners.class)
public @interface RabbitListener {
String id() default "";

String containerFactory() default "";

String[] queues() default {};

Queue[] queuesToDeclare() default {};

boolean exclusive() default false;

String priority() default "";

String admin() default "";

QueueBinding[] bindings() default {};

String group() default "";

String returnExceptions() default "";

String errorHandler() default "";

String concurrency() default "";

String autoStartup() default "";

String executor() default "";

String ackMode() default "";

String replyPostProcessor() default "";

String messageConverter() default "";

String replyContentType() default "";

String converterWinsContentType() default "true";
}

其中@Bindings的内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {
Queue value();

Exchange exchange();

String[] key() default {};

String ignoreDeclarationExceptions() default "false";

Argument[] arguments() default {};

String declare() default "true";

String[] admins() default {};
}

其中value 用于 定义队列 , exchange 用于绑定队列以及交换机 ,

需要注意的是 key 属性 , key是我们在消息发布者以及接受者路由的规则

打开rabbitmq的控制台可以看到 有一个 Routing Key

这个 Routing Key 实际上就是对应@Bindings 注解 里面的key

如果不定义key , 消息是无法在 发布者以及 接收者 之前传播的

这里定义 消息接收者如下 , 绑定了 MQConstant.INTERFACE_ROUTE_EXCHANGE 交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MQConstant.IP_ANALYSIS_QUEUE),
exchange = @Exchange(name =MQConstant.INTERFACE_ROUTE_EXCHANGE, type = ExchangeTypes.DIRECT),
key=MQConstant.IP_ANALYSIS_QUEUE
))
public byte[] analysisIP(Message message){
Map<String, Object> param = MQUtil.getParamFromMessage(message);
Object ip = param.get("ip");
if(ip instanceof JSONArray){
String ipAddr = ((JSONArray) ip).get(0).toString();
// String ipAddr=(String)ip; 强制转换不行 , 经过序列化传输的 IP 类名是 JsonArray
if(StringUtils.isEmpty(ipAddr)){
return MQUtil.result2Byte(ResultUtil.error());
}
// 正常逻辑
try{
InetAddress inetAddress = InetAddress.getByName(ipAddr);
if (inetAddress instanceof Inet4Address) {
String location = IpUtil.getIpLocation(ipAddr);
BaseResponse<String> success = ResultUtil.success(location);
return MQUtil.result2Byte(success);
} else {
throw new BusinessException(ErrorCode.PARAMS_ERROR,"参数不是合法的IPv4地址!");
}
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}else{
return MQUtil.result2Byte(ResultUtil.error());
}
}