2021最新Java面经整理 | 中间件篇(三)RabbitMQ

2021最新Java面经整理 | 中间件篇(三)RabbitMQ

目录

一、RabbitMQ 简介

二、结构组成和核心组件(重点)

三、工作模式(交换机类型)

1、fanout(扇型交换机)

2、direct(直连交换机)

3、topic(主题交换机)

4、headers(头交换机)

四、消息的传递过程(重点)

1、发送消息过程

2、消费消息过程

五、持久化(重点)

1、队列持久化和消息持久化

2、持久化原理

六、事务

七、确认机制(重点)

1、Confirm 消息确认机制

2、Confirm 的三种方式

八、死信队列的应用

1、死信队列

2、过期消息

3、延时队列

4、应用

九、消息的重要问题

1、消息的顺序性(如何保证RabbitMQ消息的顺序性)

2、消息重复消费(如何保证消息不被重复消费?)

3、消息丢失(如何解决丢数据的问题?)

十、其他问题


一、RabbitMQ 简介

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ有如下特点,

  • 可靠性(Reliability),RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认
  • 灵活的路由(Flexible Routing),在消息进入队列之前,通过 Exchange 来路由消息的。
  • 消息集群(Clustering),多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 
  • 高可用(Highly Available Queues),队列可以在集群中的机器上进行镜像,在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol),RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 多语言客户端(Many Clients),RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
  • 管理界面(Management UI),RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
  • 跟踪机制(Tracing),如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System),RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

二、结构组成和核心组件(重点

我们先看下 RabbitMQ 的结构图,如下,图重分两大块:RabbitMQ Server 和 RabbitMQ Client,其中 Client,可以包含生产者(Publisher或Provider)和消费者(Consumer)。

RabbitMQ 结构图中有几个重要的概念:VHost(虚机主机)、Exchange(交换机)、Queue(队列)、Binding(绑定)。整个消息传递的流程都是围绕着几个组件来进行的。生产者把消息发布到Exchange上,然后Exchange把消息路由到与Exchange绑定的Queue队列中,消费者建立与Queue的连接后,消费消息。

1、Broker(服务节点)

RabbitMQ Server服务器,服务节点称为Broker。

2、Virtual Host(虚拟主机)

虚拟主机,标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

3、Exchange(交换机)

交换机,用来接收生产者(producer)发送的消息,并将这些消息路由给服务器中的队列(Queue)。

4、Queue(消息队列)

消息队列,用来保存消息直到发送给消费者它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

5、Binding(绑定)

绑定,用于交换机(Exchange)和消息队列(Queue)之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

6、Channel(信道)

信道,多路复用连接中的一条独立的双向数据流通道。新到是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过新到发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接(一个TCP链接,可以包含多个信道)

7、Producer(生产者)

Producer消息生产者,也是一个向交换器发布消息的客户端应用程序。

8、Consumer(消费者)

消息消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

10、Connection(连接)

网络连接,比如一个TCP连接。一个consumer需要和broker建立连接,以获取队列中的消息。

11、Message(消息)

消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息的路由模式)等。

三、工作模式(交换机类型)

Exchange分发消息时,根据交换机的类型不同,分发策略也不同。常见的交换机类型有四种:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,很少使用)。

1、fanout(扇型交换机)

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

上图所示,生产者(P)生产消息1将消息1推送到Exchange,由于Exchange Type=fanout这时候会遵循fanout的规则将消息推送到所有与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。

2、direct(直连交换机)

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中

当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。

3、topic(主题交换机)

前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:

  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如 “stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。
  • binding key与routing key一样也是句点号“. ”分隔的字符串。
  • binding key中可以存在两种特殊字符“ ”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue1中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。

4、headers(头交换机)

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

此外headers交换器和direct交换器完全一致,但是性能差了很多,很少使用。

四、消息的传递过程(重点)

RabbitMQ 的消息策略是怎样的呢?消息是怎样传递的呢?首先明确一点就是生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,这里还会对不符合路由规则(由Exchange Type决定规则)的消息进行丢弃掉。RabbitMQ 通过Binding将Exchange和Queue链接在一起,然后将消息准确的推送到对应的Queue中。

  • Routing Key:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。 
  • Binding Key:binding key 用来绑定(Binding)Exchange与Queue。当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。

1)消息的传递

2)结构图

要解释清楚 RabbitMQ 整个消息的传递过程,我们需要上面两张图来看,分两步:发送消息和消费消息

1、发送消息过程

Producer(生产者)向 RabbitMQ Server(Broker)发送消息,

  1. 获取Conection
  2. 获取Channel
  3. 定义Exchange和Queue
  4. 用一个RoutingKey将Queue Binding到一个Exchange上
  5. 通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,发送完成。

生产者需要关心:Exchange、Queue、Binding。

2、消费消息过程

Consumer(消费者)向 RabbitMQ Server(Broker)订阅消息,

  1. 获取Conection
  2. 获取Channel
  3. 指定一个Queue直接到它关心的Queue上取消息,取到消息后自己消费,按配置决定是否返回确认。

消费者只需要关心:Queue。

五、持久化(重点)

1、队列持久化和消息持久化

重启RabbitMQ后,队列和交换器都会丢失(随同里面的消息),原因在于每个队列和交换器的durable属性,该属性默认为false。RabbitMQ提供了durable属性来实现持久化,保证断电后消息不丢失。RabbitMQ 的持久化分:交换机、队列持久化和消息持久化。

  • 交换机、队列持久化:持久化交换机(Exchange)和队列(Queue),设置交换机和队列的durable属性为ture。
  • 消息持久化:持久化消息本身,设置消息的“投递模式”属性设置为 2 (delivery_mode=2)。

所以,RabbitMQ 的消息持久化,需要做到以下三点:

  1. 把消息的投递模式选项设置为2(delivery_mode=2)
  2. 将消息发送到持久化的交换机(durable=true)
  3. 消息到达持久化队列(durable=true)

注意,如果原先有非持久的交换器或者队列,需要删除后才可重新创建,否则就创建其他名称的交换器或者队列,代码如下:

//声明持久交换器
channel.ExchangeDeclare(
    "HelloExchange",    //交换器名称
    ExchangeType.Direct,//交换器类型
    true,              //是否持久话
    false,              //是否自动删除
    null                //关于交换器的详细设置,键值对形式
    );
//声明持久队列
channel.QueueDeclare(
    "HelloQueue",//队列名称
    true,       //是否持久化
    false,       //是否只对首次声明的队列可见
    false,       //是否自动删除
    null         关于队列和队列内消息的详细设置,键值对形式
    );
//发布持久消息
string msg_str = "这是生产者第一次发布的消息";
IBasicProperties msg_pro = channel.CreateBasicProperties();
msg_pro.ContentType = "text/plain";//发布的数据类型
msg_pro.DeliveryMode = 2;//标记持久化

2、持久化原理

RabbitMQ 的持久化机制是:把持久化的数据写入磁盘上的一个持久化日志文件,在做数据恢复时,从磁盘读取持久化的数据重建。当发布一条持久化的消息到持久化的交换机上时,RabbitMQ 会在消息提交到日志文件后才发送响应。如果RabbitMQ重启,服务器会自动重建交换机和队列,重播持久性日志文件中的消息到合适的队列或者交换机上。

消息持久化对RabbitMQ的性能有较大影响,写入磁盘要比写入内存慢很多,而且会极大的减少RabbitMQ服务器每秒可处理的消息总数,导致消息吞度量降低至少10倍的情况并不少见。持久化消息在RabbitMQ内建集群环境中工作的并不好,实际上集群上的队列均匀分布在各个节点上而且没有冗余,如果运行a队列的节点崩溃了,那么直到节点恢复前,这个队列就从整个集群消失了,而且这个节点上的所有队列不可用,而且持久化队列也无法重建。

六、事务

RabbitMQ 支持 AMQP 事务,来处理消息丢失的情况(确认机制比事务更轻量)。AMQP事务与数据库事务不同

AMQP事务:提供的一种保证消息成功投递的方式,通过将信道开启事务模式后,利用信道 Channel 的三个命令来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递。

  • channel.txSelect(): 开启事务
  • channel.txCommit() :提交事务
  • channel.txRollback() :回滚事务

RabbitMQ的事务非常消耗性能,不但会降低大约2-10倍的消息吞度量,而且会使生产者应用程序之间产生同步,与使用MQ解耦异步系统的初衷相背离。

七、确认机制(重点)

1、Confirm 消息确认机制

相比较事务模式,RabbitMQ 提供了更好的方案来保证消息投递:发送方确认模式

和事务类似,我们需要将信道 channel 设置为 confirm 模式,而且只能通过重新创建信道来关闭该设置。一旦信道进入 confirm 模式,所有的信道上发布的消息都会被指派一个唯一的ID当消息被投递到队列后,信道就会发送一个发送方确认模式给生产者程序,使得生产者知道消息安全到达队列了如果发送的消息丢失,RabbitMQ会发送一条nack消息,告诉生产者消息丢失,生产者会再次发送消息(re-publish)。发送发确认模式最大的好处是它们是异步的,没有回滚的概念,更加轻量级,对性能的影响也几乎忽略不计。

2、Confirm 的三种方式

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者 (包含消息的唯一ID) ,这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag 域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

开启confirm模式,

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();

1)Confirm 普通模式(单条)

每发送一条消息,调用 channel.waitForConfirms() 方法等待服务端confirm,这实际上是一种串行的confirm,每publish一条消息之后就等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

// 开启发送方确认模式
channel.confirmSelect();
String message = String.format("时间 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) { // confirm 普通单条
    System.out.println("消息发送成功" );
}

2)Confirm 批量模式(批量)

每发送一批消息之后,调用 channel.waitForConfirmsOrDie() 方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降。

// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    String message = String.format("时间 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");

3)Confirm 异步模式(异步)

RabbitMQ 使用 channel.addConfirmListener()异步监听,异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。

// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    String message = String.format("时间 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未确认消息,标识:" + deliveryTag);
    }
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
    }
});

有测试总结:Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。

八、死信队列的应用

1、死信队列

“死信”是RabbitMQ中的一种消息机制。死信队列(死信交换机),又称 dead-letter-exchange(DLX)。当一条消息在一个队列中变成死信后,它会被重新发布到一个交换机中,这个交换机就是 DLX。

队列中的消息在以下三种情况下会变成死信,

  1. 消息被拒绝(reject ,nack),并且 requeue = false(不再重新投递)
  2. 消息 TTL 过期(TTL,即Time To Live,是指消息的存活时间)
  3. 队列超过最长长度

消息进入死信队列的过程:消息 -> 队列 (触发以上条件)-> DLX交换机 -> DLK队列。

2、过期消息

RabbitMQ 中存在两种方可设置消息的过期时间,

  • 队列设置:通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,在队列申明的时候使用 x-message-ttl 参数,单位为毫秒。
  • 单个消息设置:通过对消息本身进行设置,那么每条消息的过期时间都不一样,设置消息属性的 expiration 参数的值,单位为毫秒。

如果同时使用这两种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个死信消息。

3、延时队列

RabbitMQ 中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列消费者监听死信交换器绑定的队列,而不要监听消息发送的队列

4、应用

1)场景

订单下单10秒后,若用户没有付款,则系统自动取消订单。

2)分析

以上适合使用延时队列解决,RabbitMQ 的延时队列可以由 过期消息+死信队列 来实现。

  • 过期消息+死信队列 实现延时队列。
  • 过期消息通过队列中设置 x-message-ttl 参数实现。
  • 死信队列通过在队列申明时,给队列设置 x-dead-letter-exchange 参数,然后另外申明一个队列绑定x-dead-letter-exchange对应的交换器

不使用传统的轮询方式,优势:若数据库数据量大,则定时轮询就会特别消耗资源,拖垮服务器,且响应慢

3)实现

大致流程,

九、消息的重要问题

1、消息的顺序性(如何保证RabbitMQ消息的顺序性)

有的业务场景需要我们,保证RabbitMQ消息的顺序性。当 RabbitMQ 中一个queue对应多个consumer的时候,无法保证消息消费的顺序性。解决方案:

1)原queue拆分成多个queue,一个queue对应一个consumer,就是多一些queue而已,确实是麻烦点;

2)或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理;

其他:针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中。然后只用一个消费者去消费该队列。同一个queue里的消息一定是顺序消息的。我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。例如B消息的业务应该保证在A消息后业务后执行,那么我们保证A消息先进queueA,B消息后进queueB就可以了。

2、消息重复消费(如何保证消息不被重复消费?)

如何保证消息不被重复消费?保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:

1)如果你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

2)如果你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

综上,消息队列出现消息重复的原因有多种,消息队列并不能保证消息的唯一,所以我们只能在业务层面上做这些控制。

  • 全局唯一id,比如通过消息队列来生成订单,那订单号就是唯一的,在进行插入数据库之前先判断是否这个订单号是否已经存在了,如果已经存在了,说明已经消费过这条消息了,直接丢弃。
  • 消息确认表,将消息标示号存入redis或者数据库,在进行消费之前进行一个判断。

总之解决的方案很多,要看具体业务场景。

3、消息丢失(如何解决丢数据的问题?)

如何解决丢数据的问题?

1)生产者丢数据

生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息

transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

2)消息队列丢数据

处理消息队列丢数据的情况,一般是开启消息持久化。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步:

  • 将queue的持久化标识durable设置为true,则代表是一个持久的队列
  • 发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)

3)消费者丢数据

启用手动确认模式可以解决这个问题,

  • 自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
  • 手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。
  • 不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

十、其他问题

1、什么是 rabbitmq

采用 AMQP 高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦

2、为什么要使用 rabbitmq

  • (1)在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;
  • (2)拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
  • (3)实现消费者和生产者之间的解耦。
  • (4)对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。
  • (5)可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。

3、使用 rabbitmq 的场景

  • (1)服务间异步通信
  • (2)顺序消费
  • (3)定时任务
  • (4)请求削峰

4、如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?

1)发送方确认模式

将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。

一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

2)接收方确认机制

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。

这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;

下面罗列几种特殊情况:

(1)如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)

(1)2如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

 

5、如何避免消息重复投递或重复消费?

在消息生产时,MQ 内部针对每条生产者发送的消息生成一个 inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重的依据,避免同一条消息被重复消费。

6、消息基于什么传输?

由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。

7、消息如何分发?

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。

8、消息怎么路由?

消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。

常用的交换器主要分为一下三种:

  • fanout:如果交换器收到消息,将会广播到所有绑定的队列上
  • direct:如果路由键完全匹配,消息就被投递到相应的队列
  • topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符

9、如何确保消息不丢失?

消息持久化,当然前提是队列必须持久化。

RabbitMQ 确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit 会在消息提交到日志文件后才发送响应。一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ 会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前 RabbitMQ 重启,那么 Rabbit 会自动重建交换器和队列(以及绑定),并重新发布持久化日志文件中的消息到合适的队列。

 

10、RabbitMQ 有几种广播类型?

  • direct(默认方式):最基础最简单的模式,发送方把消息发送给订阅方,如果有多个订阅者,默认采取轮询的方式进行消息发送。
  • headers:与 direct 类似,只是性能很差,此类型几乎用不到。
  • fanout:分发模式,把消费分发给所有订阅者。
  • topic:匹配订阅模式,使用正则匹配到消息队列,能匹配到的都能接收到。

11、RabbitMQ 持久化有什么缺点?

持久化的缺地就是降低了服务器的吞吐量,因为使用的是磁盘而非内存存储,从而降低了吞吐量。可尽量使用 ssd 硬盘来缓解吞吐量的问题。

12、RabbitMQ 节点的类型有哪些?

磁盘节点:消息会存储到磁盘。

内存节点:消息都存储在内存中,重启服务器消息丢失,性能高于磁盘类型。

13、RabbitMQ 集群中唯一一个磁盘节点崩溃了会发生什么情况?

如果唯一磁盘的磁盘节点崩溃了,不能进行以下操作:

  • 不能创建队列
  • 不能创建交换器
  • 不能创建绑定
  • 不能添加用户
  • 不能更改权限
  • 不能添加和删除集群节点

唯一磁盘节点崩溃了,集群是可以保持运行的,但你不能更改任何东西。

14、RabbitMQ 怎么保证消息的稳定性?

提供了事务的功能和confirm(确认)模式,推荐confirm模式。

15、要保证消息持久化成功的条件有哪些?

  • 声明队列必须设置持久化 durable 设置为 true.
  • 消息推送投递模式必须设置持久化,deliveryMode 设置为 2(持久)。
  • 消息已经到达持久化交换器。
  • 消息已经到达持久化队列。

以上四个条件都满足才能保证消息持久化成功。

16、RabbitMQ 每个节点是其他节点的完整拷贝吗?为什么?

存储空间的考虑:如果每个节点都拥有所有队列的完全拷贝,这样新增节点不但没有新增存储空间,反而增加了更多的冗余数据;

性能的考虑:如果每条消息都需要完整拷贝到每一个集群节点,那新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟。

17、RabbitMQ 中 vhost 的作用是什么?

vhost:每个 RabbitMQ 都能创建很多 vhost,我们称之为虚拟主机,每个虚拟主机其实都是 mini 版的RabbitMQ,它拥有自己的队列,交换器和绑定,拥有自己的权限机制。

18、RabbitMQ 集群搭建需要注意哪些问题?

各节点之间使用“–link”连接,此属性不能忽略。

各节点使用的 erlang cookie 值必须相同,此值相当于“秘钥”的功能,用于各节点的认证。

整个集群中必须包含一个磁盘节点。

19、RabbitMQ 对集群节点停止顺序有要求吗?

RabbitMQ 对集群的停止的顺序是有要求的,应该先关闭内存节点,最后再关闭磁盘节点。如果顺序恰好相反的话,可能会造成消息的丢失。

20、RabbitMQ 的消息是怎么发送的?

首先客户端必须连接到 RabbitMQ 服务器才能发布和消费消息,客户端和 rabbit server 之间会创建一个 tcp 连接,一旦 tcp 打开并通过了认证(认证就是你发送给 rabbit 服务器的用户名和密码),你的客户端和 RabbitMQ 就创建了一条 amqp 信道(channel),信道是创建在“真实” tcp 上的虚拟连接,amqp 命令都是通过信道发送出去的,每个信道都会有一个唯一的 id,不论是发布消息,订阅队列都是通过这个信道完成的。

21、RabbitMQ 怎么避免消息丢失?

把消息持久化磁盘,保证服务器重启消息不丢失。 每个集群中至少有一个物理磁盘,保证消息落入磁盘。

22、RabbitMQ 有哪些重要的组件?

  • ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
  • Channel(信道):消息推送使用的通道。
  • Exchange(交换器):用于接受、分配消息。
  • Queue(队列):用于存储生产者的消息。
  • RoutingKey(路由键):用于把生成者的数据分配到交换器上。
  • BindingKey(绑定键):用于把交换器的消息绑定到队列上。

23、如何保证高可用的?RabbitMQ 的集群

1)主备模式

也称为 Warren (兔子窝) 模式。实现 rabbitMQ 的高可用集群,一般在并发和数据量不高的情况下,这种模式非常的好用且简单。也就是一个主/备方案,主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点,和 activeMQ 利用 zookeeper 做主/备一样,也可以一主多备。

2)远程模式

远程模式可以实现双活的一种模式,简称 shovel 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制。 Shovel 就是我们可以把消息进行数据中心的复制工作,我们可以跨地域的让两个 MQ 集群互联。

3)镜像模式

非常经典的 mirror 镜像模式,保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。mirror 镜像队列,目的是为了保证 rabbitMQ 数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步。对于 100% 数据可靠性解决方案,一般是采用 3 个节点。

4)多活模式

也是实现异地数据复制的主流模式,因为 shovel 模式配置比较复杂,所以一般来说,实现异地集群的都是采用这种双活 或者 多活模型来实现的。这种模式需要依赖 rabbitMQ 的 federation 插件,可以实现持续的,可靠的 AMQP 数据通信,多活模式在实际配置与应用非常的简单。rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。

24、如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

消息积压处理办法:临时紧急扩容:

先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

 

  • 1
    点赞
  • 3
    收藏
    觉得还不错? 一键收藏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值