Springboot map遍历 Pytorch 动态条形图 encryption redux rspec ai视频教程下载 js的点击事件 mysql降序 java数据分析 kubernetes视频 python中的for循环 python基础教程 python正则匹配 javalabel java语言基础教程 java正则表达式用法 java中tostring java格式化日期 linux磁盘管理 python视频教程 flash实例 flash实例教程 狮子狗出装 神龙kms 微信摇骰子表情包 数科阅读器 战地联盟辅助 stretchcolumns extjs视频教程 bz2解压命令 iframe跨域 ps制作表格 一键隐藏 foobar2000插件 jquery手册 dnf选择角色卡死 夜之魇掉落 捷速pdf编辑器
当前位置: 首页 > 学习教程  > 编程语言

RabbitMQ的高级属性详解

2020/11/4 14:48:19 文章标签:

rabbitMQ的高级属性详解 一:消费端消费消息是同步的还是异步的 同步消费(单线程消费),在实际开发中,为了提高消费端的消费效率,会在消费端使用线程池并发消费 1. 服务端 public class Provider {public static void main ( St…

rabbitMQ的高级属性详解

一:消费端消费消息是同步的还是异步的

同步消费(单线程消费),在实际开发中,为了提高消费端的消费效率,会在消费端使用线程池并发消费

1. 服务端

public class Provider {
    public static void main ( String[] args ) throws IOException {
//        获取连接
        Connection connection = ConnectionUtils.getConnection();
//        创建通道
        Channel channel = connection.createChannel();
//        声明队列
        channel.queueDeclare("queueH",false,false,false,null);
//        发送消息到队列
//        String exchange, String routingKey, BasicProperties props, byte[] body)
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("","queueH",null,("hellow"+i).getBytes());

        }

//        关闭连接 连接关闭以后通道会自动关闭
        connection.close();
    }
}

2. 消费端

public class Consumer {

    public static void main ( String[] args ) throws IOException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
//        获取连接
        Connection connection = ConnectionUtils.getConnection();
//        创建通道
        Channel channel = connection.createChannel();
//        声明队列
        channel.queueDeclare("queueH",false,false,false,null);
//        消费消息
        channel.basicConsume("queueH",new DefaultConsumer(channel){
            @Override
            public void handleDelivery ( String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body ) throws IOException {
//                System.out.println(envelope.getDeliveryTag());
                executorService.submit(new Runnable() {
                    @Override
                    public void run () {
                        System.out.println(new String(body));
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });

            }
        });
    }
}

二:消息的过期时间-TTL

2.1 通过队列的方式,设置消息的过期时间(对队列中的所有消息有效)

  HashMap<String, Object> map = new HashMap<>();
        map.put("x-message-ttl",5000);
//        声明队列
   channel.queueDeclare("queueH",false,false,false,map);

2.2 给消息本身设置过期时间

//        声明队列
        channel.queueDeclare("queueH",false,false,false,null);
//        发送消息到队列
//        给消息本身设置过期时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("","queueH",properties,("hellow"+i).getBytes());

        }

2.3 队列的过期时间

 HashMap<String, Object> map = new HashMap<>();
//        设置过期时间后,到期后,队列会自动删除
  map.put("x-expires",5000);
//        声明队列
 channel.queueDeclare("queueH01",false,false,false,map);

三: 设置消息的优先级

注意:优先级队列,必须在消息堆积时才有意义,如果消费速度>消息的发布速度,优先级队列是没有任何作用的

public class Provider {
    public static void main ( String[] args ) throws IOException {
//        获取连接
        Connection connection = ConnectionUtils.getConnection();
//        创建通道
        Channel channel = connection.createChannel();
        HashMap<String, Object> map = new HashMap<>();
//        设置队列的最大优先级,这个队列的最大优先级是100
        map.put("x-max-priority",100);
//        声明队列
        channel.queueDeclare("queueH",false,false,false,map);
//        发送消息到队列
        for (int i = 0; i < 100; i++) {
            //        设置1-100的随机数
            int random = (int) (Math.random()*100+1);
//             ,设置消息的优先级
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .priority(random)
                    .build();
            channel.basicPublish("","queueH",properties,("hellow"+random).getBytes());
        }

//        关闭连接 连接关闭以后通道会自动关闭
        connection.close();
    }
}

四:死信队列

什么是死信队列

死信消息:一个普通队列中,如果发生了消息过期的问题(也有可能是其他问题),那么这个消息就会被丢弃,被丢弃的消息就称之为死信消息

死信路由:一个普通的队列,如果绑定了死信路由,则所有的死信消息不会直接被丢弃,而是转发给死信路由

死信队列:和死信路由绑定在一起的队列就称之为死信队列

public class Provider {
    public static void main ( String[] args ) throws IOException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
//        声明一个死信队列,以及一个死信路由,并让死信队列绑定死信路由
        channel.queueDeclare("dead",false,false,false,null);
        channel.exchangeDeclare("deadroute","fanout");
        channel.queueBind("dead","deadroute","");
//        声明一个正常的队列,这个队列设置死信属性
        HashMap<String, Object> map = new HashMap<>();
//        给队列中的所有消息设置一个过期时间,消息到期后,就会变成死信消息
        map.put("x-message-ttl",7000);
        map.put("x-dead-letter-exchange","deadroute");
        channel.queueDeclare("normal",false,false,false,map);
//        发送消息
        channel.basicPublish("","normal",null,"xiaoxi".getBytes());
    }
}

五:延迟队列

实现:通过TTL+死信队列来实现

代码与模拟的死信队列的代码相同

六:消息的持久化

什么是消息的持久化

消息的持久化也就是说当rabbitmq服务关闭以后,消息不丢失,默认rabbitmq的消息是非持久化的

注意:

1.队列的持久化不意为着消息的持久化

2.队列如果不设置持久化,消息持久化毫无意义

3.消息持久化其实就是将消息写入硬盘,如果所有消息都持久化,必然会降低RabbitMq服务的消息吞吐量,所以实际开发过程中,尽量只让需要的消息持久化

//设置消息的持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties()
        .builder()
        .deliveryMode(2)//消息持久化模式
        .build();

//也可直接使用该常量
//MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish("normal-exchange", "", properties, msg.getBytes("utf-8"));

七: RabbitMQ的消息确认机制

1. 提供者的消息确认机制

1. 通过事务模式

public class Provider01 {
    public static void main ( String[] args ) throws IOException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("queue03",false,false,false,null);
//        通过事务模式实现消息确认机制
//            1. 开启事务
        channel.txSelect();
        try {
//            设置消息的持久化
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .build();
//            第二种消息持久化方式
//            AMQP.BasicProperties properties1 = MessageProperties.PERSISTENT_TEXT_PLAIN;
//            发送消息
//            如果消息发送成功,则提交事务
            channel.txCommit();
        }catch (Exception e){
//            出现异常,事务回滚
         channel.txRollback();
//         进行消息的重试与补偿
        }
    }
}

2.confirm的同步模式

public class Provider02 {
    public static void main ( String[] args ) throws IOException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("queue03",false,false,false,null);
//    开启confirm机制
        channel.confirmSelect();
//        发送消息
//        这里是发送消息
//        同步等到rabbitmq的响应
        boolean b = channel.waitForConfirms();
        if (!b){
//            进行消息的重试和补偿机制
        }
    }
}

3.confirm的异步模式

public class Provider03 {
    public static void main ( String[] args ) throws IOException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("queue03", false, false, false, null);
//    开启confirm机制
        channel.confirmSelect();
//        发送消息
//        这里是发送消息
//        设置异步的监听方法
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 如果消息成功,就会回调该方法
             * @param deliveryTag 表示成功的消息id
             * @param multiple 表示是否批量,如果为true,就说明消息id之前的全部成功,如果为false。说明单个消息成功
             * @throws IOException
             */
            @Override
            public void handleAck ( long deliveryTag, boolean multiple ) throws IOException {

            }

            /**
             * 如果消息失败,就会回调该方法
             * @param deliveryTag 失败的消息id
             * @param multiple 表示是否批量失败,如果为true,批量失败,为false,单个失败
             * @throws IOException
             */

            @Override
            public void handleNack ( long deliveryTag, boolean multiple ) throws IOException {
                
            }
        });
    }

}

八:消息的手动签收和重回队列

在消费端进行手动签收和重回队列

public class Consumer {
    public static void main ( String[] args ) throws IOException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
//        手动确认消息,第二个参数必须设置为fasle
        channel.basicConsume("queueH06",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery ( String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body ) throws IOException {
                System.out.println(new String(body));
//                消息的手动确认
//                channel.basicAck(envelope.getDeliveryTag(),false);
//                消息的重回队列
//                第三个参数,表示是否重回队列,true,重回队列,fasle。变成死信消息
                channel.basicNack(envelope.getDeliveryTag(),false,true);
            }
        });

    }
}

本文链接: http://www.dtmao.cc/news_show_350209.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?