java面试题:MQ、RabbitMQ面试题(面试必问,精华版)

目录

1.为什么要引入MQ/RabbitMQ(中间件),直接读写数据库不行吗?

2、什么是RabbitMQ?

3、RabbitMQ有什么优缺点?

优点:

缺点:

4、RabbitMQ组件介绍

5、交换机的几种类型?

拓展:消息怎么路由?

拓展:消息如何分发?

6、SprongBoot整合RabbitMQ:

7、RabbitMQ如何保证不丢数据?

1、可靠抵达-ConfirmCallback

2、可靠抵达-ReturnCallback

3、可靠抵达-Ack消息确认机制

8、如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?(类似7)

发送方确认模式

接收方消息确认机制

9、如何保证RabbitMQ不被重复消费?


1.为什么要引入MQ/RabbitMQ(中间件),直接读写数据库不行吗?

1、在分布式系统下中间件具备异步处理,流量削峰等一系列高级功能;

2、中间件可以实现生产者和消费者之间的解耦。

3、拥有持久化的机制,进程消息,队列中的信息也可以保存下来。

4、对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。

5、可以使用消息队列达到异步下单的效果,后台进行逻辑下单。


2、什么是RabbitMQ?

RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件,核心思想是生产者不会将消息直接发送给队列,消息在发给客户端时会先发给交换机,然后再由交换机发送给对应的队列。


3、RabbitMQ有什么优缺点?

优点:

1、解耦

系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

2、异步

将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

3、削峰

并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

缺点:

1、一致性问题

A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。

2、系统的可用性降低

系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。

3、系统的复杂性提高

引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?


4、RabbitMQ组件介绍

Broker:简单来说就是消息队列服务器实体。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue:按照路由规则绑定起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个oroker!里可以开设多个vhost,用作不同用户的权限分离。

producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。

channel:消息通道,在客户端的每个连接里,可建立多个channel,.每个channelf代表一个会话任务。


5、交换机的几种类型?

直连交换机(Direct Exchange)根据消息携带的路由键将消息投递给对应队列,它是完全匹配、单播的模式。

扇型交换机(Fanout Exchange)这个交换机没有路由键概念,就算你绑了路由键也是无视的。这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

主题交换机(Topic Exchange)发布订阅模式这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。*(星号)用来表示一个单词(必须出现的),#(井号)用来表示任意数量(零个或多个)单词

拓展:消息怎么路由?

消息提供方->路由->一至多个队列

消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。

通过队列路由键,可以把队列绑定到交换器上。

消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

常用的交换器主要分为一下三种

1.fanout:如果交换器收到消息,将会广播到所有绑定的队列上

2.direct:如果路由键完全匹配,消息就被投递到相应的队列

3.topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符


拓展:消息如何分发?

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能。


6、SprongBoot整合RabbitMQ:

1.引入spring-boot-starter--amqp

2.application.yml配置

3.测试RabbitMQ

  • 1.AmgpAdmin:管理组件,用来创建队列、绑定关系等等
  • 2.RabbitTemplate:消息发送处理组件,用来收发消息
  • 3.RabbitListener:用来监听消息

7、RabbitMQ如何保证不丢数据?

保证消息不丢失,可靠抵达,可以使用事务消息,但性能下降250倍,为此引入RabbitMQ的消息确认机制

  • publisher→Broker: confirmCallback确认模式
  • Exchange→Queue:returnCallback未投递到queue退回模式
  • Queue→consumer: ack机制

1、可靠抵达-ConfirmCallback

消息只要被broker接收到就会执行confirmCallback,如果是cluster模式,需要所有broker接收到才会调用confirmCallback,表示message已经到达服务器。

2、可靠抵达-ReturnCallback

用到return退回模式,保证消息一定要投递到目标queue里,如果未能投递到目标queue里将调用returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

3、可靠抵达-Ack消息确认机制

消费者获取到消息,成功处理,可以回复Ack给Broker。


8、如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?(类似7)

发送方确认模式

1.将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。

2.一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

3.如果 RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(notacknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方消息确认机制

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;


9、如何保证RabbitMQ不被重复消费?

1、在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;

2、在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。

这个问题针对业务场景来答分以下几点:

1、 拿到这个消息做数据库的insert操作。然后给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

2、 拿到这个消息做Redis的set的操作,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

3、 如果上面两种情况还不行。准备一个第三方介质,来做消费记录。以Redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入Redis。那消费者开始消费前,先去Redis中查询有没消费记录即可。


10、购物车同步redis,异步mysql场景中,mysql数据一直写入不进去怎么办?

死信的概念 queue 中的某些消息无法被消费

死信的来源:
  • 消息 TTL 过期 (TTL RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间
  • 队列达到最大长度(队列满了,无法再添加数据到 mq )
  • 消息被拒绝(basic.reject basic.nack)并且 requeue=false
延迟队列概念:
延时队列就是用来存放需要在指定时间被处理的元素的队列
延迟队列使用场景
1. 订单在十分钟之内未支付则自动取消
2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
Rabbitmq 插件实现延迟队列:

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。
 

插件源码地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安装:

进入插件安装目录
{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)
下载插件
rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

    1

(如果下载的文件名称不规则就手动重命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)
启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(关闭插件)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange


插件使用

通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...


发送消息的时候通过在header添加"x-delay"参数来控制消息的延时时间

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...


使用示例:

消息发送端:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    // 队列名称
    private final static String EXCHANGE_NAME="delay_exchange";
    private final static String ROUTING_KEY="key_delay";

    @SuppressWarnings("deprecation")
    public static void main(String[] argv) throws Exception {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 声明x-delayed-type类型的exchange
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true,
                false, args);


        Map<String, Object> headers = new HashMap<String, Object>();
        //设置在2016/11/04,16:45:12向消费端推送本条消息
        Date now = new Date();
        Date timeToPublish = new Date("2016/11/04,16:45:12");

        String readyToPushContent = "publish at " + sf.format(now)
                + " \t deliver at " + sf.format(timeToPublish);

        headers.put("x-delay", timeToPublish.getTime() - now.getTime());

        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
                .headers(headers);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(),
                readyToPushContent.getBytes());

        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}


消息接收端:

import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "delay_queue";
    private final static String EXCHANGE_NAME="delay_exchange";

    public static void main(String[] argv) throws Exception,
            java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.queueDeclare(QUEUE_NAME, true,false,false,null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        try {
            System.out.println("****************WAIT***************");
            while(true){
                QueueingConsumer.Delivery delivery = queueingConsumer
                        .nextDelivery(); //

                String message = (new String(delivery.getBody()));
                System.out.println("message:"+message);
                System.out.println("now:\t"+sf.format(new Date()));
            }

        } catch (Exception exception) {
            exception.printStackTrace();

        }

    }
}


启动接收端,启动发送端
运行结果:

****************WAIT***************
message:publish at 2016-11-04 16:44:16.887      deliver at 2016-11-04 16:45:12.000
now:    2016-11-04 16:45:12.023


结果显示在我们2016-11-04 16:45:12.023接收到了消息,距离我们设定的时间2016-11-04 16:45:12.023有23毫秒的延迟

Note:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用

Note :使用过程中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会无法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据造成启动超时,并建议不要使用Ram节点

  • 1
    点赞
  • 34
    收藏
    觉得还不错? 一键收藏
  • 打赏
    打赏
  • 0
    评论
一:RabbitMQ 中的 broker 是指什么?cluster 又是指什么? 问二:什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据 有哪些?元数据是如何保存的?元数据在 cluster 中是如何分布的? 问三:RAM node 和 disk node 的区别? 问四:RabbitMQ 上的一个 queue 中存放的 message 是否有数量限制? 问五:RabbitMQ 概念里的 channel、exchange 和 queue 这些东东是逻辑概念,还是对应着进程实体?这些东东分别起什么作用? 问六:vhost 是什么?起什么作用? 问七:在单 node 系统和多 node 构成的 cluster 系统中声明 queue、exchange ,以及 进行 binding 会有什么不同? 问八:客户端连接到 cluster 中的任意 node 上是否都能正常工作? 问九:若 cluster 中拥有某个 queue 的 owner node 失效了,且该 queue 被声明具有 durable 属性,是否能够成功从其他 node 上重新声明该 queue ? 问十:cluster 中 node 的失效会对 consumer 产生什么影响?若是在 cluster 中创建了 mirrored queue ,这时 node 失效会对 consumer 产生什么影响? 问十一:能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么? 问十二:为什么 heavy RPC 的使用场景下不建议采用 disk node ? 问十三:向不存在的 exchange 发 publish 消息会发生什么?向不存在的 queue 执行 consume 动作会发生什么? 问十四:routing_key 和 binding_key 的最大长度是多少? 问十五:RabbitMQ 允许发送的 message 最大可达多大? 问十六:什么情况下 producer 不主动创建 queue 是安全的? 问十七:“dead letter”queue 的用途? 问十八:为什么说保证 message 被可靠持久化的条件是 queue 和 exchange 具有 durable 属性,同时 message 具有 persistent 属性才行? 问十九:什么情况下会出现 blackholed 问? 问二十:如何防止出现 blackholed 问? 问二十一:Consumer Cancellation Notification 机制用于什么场景? 问二十二:Basic.Reject 的用法是什么? 问二十三:为什么不应该对所有的 message 都使用持久化机制? 问二十四:RabbitMQ 中的 cluster、mirrored queue,以及 warrens 机制分别用于解决 什么问?存在哪些问
以下是一些 RabbitMQ 面试题: 1. 什么是 RabbitMQ? 答:RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)。 2. RabbitMQ 的主要组件是什么? 答:RabbitMQ 的主要组件包括生产者、消费者、交换机、队列和绑定。 3. RabbitMQ 中的消息确认机制是什么? 答:RabbitMQ 中的消息确认机制包括生产者确认和消费者确认。生产者确认指消息成功发送到 RabbitMQ 代理;消费者确认指消息成功被消费。 4. RabbitMQ 中的交换机类型是什么? 答:RabbitMQ 中的交换机类型包括直连交换机、扇形交换机、主交换机和头部交换机。 5. RabbitMQ 中的持久化是什么? 答:RabbitMQ 中的持久化指消息或队列在 RabbitMQ 代理重启后仍然存在的能力。 6. RabbitMQ 中的消息路由是如何工作的? 答:RabbitMQ 中的消息路由是由交换机和绑定共同实现的。生产者将消息发送到交换机,交换机将消息路由到符合绑定条件的队列。 7. RabbitMQ 中的延迟队列是什么? 答:RabbitMQ 中的延迟队列指一种特殊的队列,它会在指定的时间后才将消息发送到消费者。常用于延迟任务处理和定时任务等场景。 8. RabbitMQ 中如何保证消息的顺序性? 答:RabbitMQ 中可以通过单个队列和单个消费者来保证消息的顺序性。也可以通过分片队列和分片消费者来实现消息的顺序性。 9. RabbitMQ 中的死信队列是什么? 答:RabbitMQ 中的死信队列指一种特殊的队列,它会接收由其他队列中被拒绝、过期或达到最大重试次数的消息。常用于处理异常或不可处理的消息。 10. RabbitMQ 中的集群是如何工作的? 答:RabbitMQ 中的集群是由多个 RabbitMQ 代理节点组成的,它们共享同一个虚拟主机和队列。集群中的节点可以相互通信,实现消息的负载均衡和高可用性。

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

91科技

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值