Android防重复点击 Git element svn EasyCVR awk extjs cassandra angular material jqgrid ldap tags bootstrap管理系统模板 java后台框架 java运行软件 input边框颜色 oracle增加主键 python学习入门 python程序 python基本语法 python中time python位操作 python排序 java变量类型 java中的继承 java环境变量配置 java学习课程 java创建集合 java语言入门 java异常 java8函数式编程 linuxsudo命令 凯立德地图免费下载 易语言进度条 脚本下载 ps怎么插入表格 风火云 视频相册制作软件 安卓刷机精灵 js跳出for循环
当前位置: 首页 > 学习教程  > 编程语言

初步了解rabbitMQ,和其几种模式(含完整代码)

2020/10/16 17:58:04 文章标签:

初步了解rabbitMQ,和其几种模式(含完整代码)rabbitMQ简介开始引入依赖及配置文件配置代码阶段简单直连模式工作模式发布订阅模式路由模式主题模式RPC模式完整代码rabbitMQ简介 RabbitMQ是实现了高级消息队列协议(AMQP&#xff09…

初步了解rabbitMQ,和其几种模式(含完整代码)

  • rabbitMQ简介
  • 开始
    • 引入依赖及配置文件配置
    • 代码阶段
      • 简单直连模式
      • 工作模式
      • 发布订阅模式
      • 路由模式
      • 主题模式
      • RPC模式
  • 完整代码

rabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)

  • 主要特性

可伸缩性:集群服务
默认模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。

消息持久化:
从内存持久化消息到硬盘,再从硬盘加载到内存
所有队列中的消息都以append的方式写到一个文件中,当这个文件的大小超过指定的限制大小后,关闭这个文件再创建一个新的文件供消息的写入。文件名(*.rdq)从0开始然后依次累加。当某个消息被删除时,并不立即从文件中删除相关信息,而是做一些记录,当垃圾数据达到一定比例时,启动垃圾回收处理,将逻辑相邻的文件中的数据合并到一个文件中。
rabbitmq在启动时会创建msg_store_persistent,msg_store_transient两个进程,一个用于持久消息的存储,一个用于内存不够时,将存储在内存中的非持久化数据转存到磁盘中。所有队列的消息的写入和删除最终都由这两个进程负责处理,而消息的读取则可能是队列本身直接打开文件进行读取,也可能是发送请求由msg_store_persisteng/msg_store_transient进程进行处理。
在进行消息的存储时,rabbitmq会在ets表中记录消息在文件中的映射,以及文件的相关信息。消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。消息的删除只是从ets表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息(更新文件有效数据大小)。
由于执行消息删除操作时,并不立即对在文件中对消息进行删除,也就是说消息依然在文件中,仅仅是垃圾数据而已。当垃圾数据超过一定比例后(默认比例为50%),并且至少有三个及以上的文件时,rabbitmq触发垃圾回收。垃圾回收会先找到符合要求的两个文件(根据#file_summary{}中left,right找逻辑上相邻的两个文件,并且两个文件的有效数据可在一个文件中存储),然后锁定这两个文件,并先对左边文件的有效数据进行整理,再将右边文件的有效数据写入到左边文件,同时更新消息的相关信息(存储的文件,文件中的偏移量),文件的相关信息(文件的有效数据,左边文件,右边文件),最后将右边的文件删除。

开始

引入依赖及配置文件配置

  • 引入依赖

    首先,先引入必要依赖

		<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.1.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.3.1.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <!--rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

配置文件:application.yml

server:
  port: 8080 #请求端口号
  servlet:
    context-path: /rabbit #请求前缀
spring:
  profiles:
    active: dev

开发环境配置文件:application-dev.yml(各个环境可以用不同的配置,图方便直接在主配置文件写死了dev的,也可以不用创建这个文件,直接在主配置文件中写)

dev:
  main: holleWord

#rabbit配置
spring:
  rabbitmq:
    # rabbit地址
    host: 127.0.0.1
    # rabbit端口号
    port: 5672
    # 用户账号和密码
    username: guest
    password: guest
    #rabbit项目名,每个virtualHost的队列是隔离的,相当于数据库
    virtual-host: /rabbit
    #开启Publisher Confirms 模式,消息发送到交换器后触发回调。
    #publisher-confirms: true
    #开启PublisherReturn 模式,交换机将消息发送到对应队列失败时触发
    #publisher-returns: true
    listener:
      #设置监听容器(Listener container)类型,如不设置,将会默认为SimpleRabbitListenerContainerFactory,且下面的direct配置不生效
      type: direct
      direct:
        #设置监听为手动答应模式
        acknowledge-mode: manual

代码阶段

p:生产者,x:交换机,c:消费者,红色方块:队列

简单直连模式

生产者发布消息后放到队列中,消费者监听到消息后立马将消息消费,并从队列中删除该消息,示例图为:

创建一个交换机和队列,并绑定,创建队列时,有几个参数需要注意一下
durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
一般设置一下队列的持久化就好,其余两个就是默认false,具体还是要根据需求来

@Configuration
public class RabbitConfig {

    /**
     * 将队列绑定到交换机中,并绑定路由
     * @return
     */
    @Bean
    public Binding bindingExchange() {
        return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitEnum.RABBIT.getRoutingKey());
    }

    /**
     * 创建队列
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue(RabbitEnum.RABBIT.getQueue(),true);
    }

    /**
     * 创建交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        return new DirectExchange(RabbitEnum.RABBIT.getDirectExchange(), true, false);
    }
}

再创建一个类,用来发送消息,监听队列的消息,并消费

@Service
public class PushService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 推送方法
     */
    public void push(){
        System.out.println("开始推送");
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend(RabbitEnum.RABBIT.getDirectExchange(), RabbitEnum.RABBIT.getRoutingKey(),"helloWord", new CorrelationData("1"));
        System.out.println("结束推送");
    }

    /**
     * 推送队列的监听
     * 注解中为监听的队列
     */
    @RabbitListener(queues = "rabbit.direct.queue.push")
    public void processDirect(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("收到消息");
        System.out.println(str);
        channel.basicAck(tag,true);
    }
}

channel.basicAck(tag,true)这个步骤为手动答应,消费完成后,如果业务代码正确执行完了,就将消息从队列中删除,如果失败就要用到nack方法了,方法如下

channel.basicNack(tag,multiple,requeue)

tag:该消息的标识

multiple:是否批量拒绝,true:将一次性拒绝所有小于deliveryTag的消息

requeue:被拒绝的是否重新入队列,如果设置为true ,该消息则会重新入列

工作模式

生产者将消息放入到队列中,有多个监听在监听这个队列,但是一个消息只能被消费一次,哪个监听处理能力更快,就能消费更多的消息

创建队列和交换机过程和上面一样,但是消费消息有些不同,消费者代码为:

/**
     * 推送队列的监听1
     * @param
     * @param channel
     * @param tag
     * @throws IOException
     */
    @RabbitListener(queues = "rabbit.direct.queue.work")
    public void consume(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
        System.out.println("1:收到消息"+str);
        channel.basicAck(tag,true);
        Thread.sleep(100);
    }

    /**
     * 推送队列的监听1
     * @param
     * @param channel
     * @param tag
     * @throws IOException
     */
    @RabbitListener(queues = "rabbit.direct.queue.work")
    public void consume2(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
        System.out.println("2:收到消息"+str);
        channel.basicAck(tag,true);
        Thread.sleep(200);
    }

两个消费者监听同一个队列,哪个消费者的处理速度快,哪个消费的消息就越多

发布订阅模式

生产者将消息放入到交换机中,不指定队列,所有绑定该交换机的队列都能收到消息,消费者再消费掉对应队列的消息,交换机类型为fanout

在这里,绑定的交换机为fanout,绑定的方式也有些不同

@Bean
    public Binding bindingPublishExchange1() {
        return BindingBuilder.bind(publishQueue1()).to(publishFanoutExchange1());
    }

    @Bean
    public Queue publishQueue1() {
        return new Queue(RabbitEnum.PUBLISH.getQueue(),true);
    }

    @Bean
    public FanoutExchange publishFanoutExchange1() {
        return new FanoutExchange(RabbitEnum.PUBLISH.getFanoutExchange(),true,false);
    }

由于群发机制,fanout是不需要绑定路由的,只要是监听绑定了该交换机的队列,就都可以收到生产者发布的消息

路由模式

生产者发布消息时附带路由键,根据路由键匹配相应的队列,将消息投放至符合条件的队列中,多个队列可以绑定一个路由键。随后再被消费者消费掉消息

代码和第一个的基本上一模一样,只是创建多个队列,而那几个队列绑定的路由键是相同的,发送消息时用路由键匹配,只要路由键匹配的队列都能收到消息

主题模式

和路由模式相似,但是是绑定的topic交换机,生产者发送消息时,交换机根据路由键来匹配队列,但是可以根据通配符来模糊匹配,发送到匹配的队列中

代码如下:

@Configuration
public class ThemeConfig {

	//绑定第一个队列,队列名为rabbit.topic.queue.theme,路由键为rabbit.topic.routingKey.theme
    @Bean
    public Binding bindingThemeExchange(){
        return BindingBuilder.bind(themeQueue()).to(themeExchange()).with(RabbitEnum.THEME.getRoutingKey());
    }

	//绑定第二个队列,队列名为rabbit.topic.queue.theme2,路由键为rabbit.topic.routingKey.theme.#
    @Bean
    public Binding bindingThemeExchange2(){
        return BindingBuilder.bind(themeQueue2()).to(themeExchange2()).with("rabbit.topic.routingKey.theme.#");
    }
	//绑定第三个队列,队列名为rabbit.topic.queue.theme3,路由键为rabbit.topic.routingKey.theme.*
    @Bean
    public Binding bindingThemeExchange3(){
        return BindingBuilder.bind(themeQueue3()).to(themeExchange3()).with("rabbit.topic.routingKey.theme.*");
    }
}

通配符用法:
#:可以匹配任意数量的分隔位
*:能匹配一个分隔位
例如 a.*只能匹配到a.XXX,而不能匹配到a.XXX.XXX 而a.#可以匹配到a.XXX.XXX

生产者和消费者代码:

	/**
     * 生产者,发送消息
     * @param routingKey 要发送到的路由键
     */
    public void theme(String routingKey){
        //#:代表一个单词
        //*:代表多个单词
        System.out.println("发送消息");
        rabbitTemplate.convertAndSend(RabbitEnum.THEME.getTopicExchange(),routingKey,"主题");
    }

	/**
     * 消费者一
     */
    @RabbitListener(queues = "rabbit.topic.queue.theme")
    public void consume(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("theme收到消息:"+str);
        channel.basicAck(tag,true);
    }

    /**
     * 消费者二
     */
    @RabbitListener(queues = "rabbit.topic.queue.theme2")
    public void consume2(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("theme2收到消息:"+str);
        channel.basicAck(tag,true);
    }
    /**
     * 消费者三
     */
    @RabbitListener(queues = "rabbit.topic.queue.theme3")
    public void consume3(String str, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("theme3收到消息:"+str);
        channel.basicAck(tag,true);
    }

我们开始测试,如图:

我们请求附带的路由键为:rabbit.topic.routingKey.theme.prqfasfasf
这时消费到这条消息的消费者分别为消费者二和消费者三


这时如果我们把路由键改为:rabbit.topic.routingKey.theme.prqfasfasf.aebs

那么就只有消费者二收到消息了

RPC模式

以上便是全部代码了,RPC模式我准备在工作空闲时间再另写一篇文章

完整代码

这是我边实践,边写的文章,理解也随着实践过程变化,如果有地方写错了,文章格式需要改进的话欢迎大家在评论区指出
https://github.com/linFeng185/TestProjects/tree/master/rabbit


本文链接: http://www.dtmao.cc/news_show_300225.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?