Kafka android开发实战 rinetd 进程 Scala CANopen url scroll vue教学视频 angular视频 kafka默认端口 java三维数组 js获取月份 bitlocker加密好慢 centos查看python版本 idea全文搜索快捷键 flutter项目案例 python编程 mysql学习 python连接mysql数据库 python支持中文 python环境变量配置 python传参 java数组反转 java编程学习 java基本类型 java运行环境配置 java获取当前月份 java有哪些数据类型 java开发入门 java连接sql数据库 java重命名 java怎么输出数组 java抛出自定义异常 linuxsleep p2pover hadoop权威指南 摩尔斯电码翻译器 快捷精灵 全英雄守城战
当前位置: 首页 > 学习教程  > 编程语言

SpringBoot+WebSocket+RabbitMQ(RabbitMQ作为Stomp消息代理)

2020/7/24 10:11:37 文章标签:

RabbitMQ作为Stomp消息代理


一、安装RabbitMQ

1.1、erlang下载

地址:erlang下载地址

1.2、rabbitmq下载

下载地址:rabbitmq(windows版本)下载地址

1.3、启动rabbitmq

  • 1.进入rabbitmq的sbin目录下
  • 2.cmd进入命令行模式
  • 3.rabbitmq-plugins enable rabbitmq_management

1.4、重启rabbitmq

  • 1.进入rabbitmq的sbin目录下
  • 2.cmd进入命令行模式
  • 3.rabbitmq-service stop
  • 4.rabbitmq-service start

1.5、安装插件(stomp)

  • 1.进入rabbitmq的sbin目录下
  • 2.cmd进入命令行模式
  • 3.rabbitmq-plugins enable rabbitmq_stomp
  • 4.rabbitmq-plugins enable rabbitmq_web_stomp

清除所有队列数据

  • rabbitmqctl stop_app
  • rabbitmqctl reset
  • rabbitmqctl start_app

修改端口

  • 1.进入rabbitmq的ebin目录下
  • 2.修改rabbit.app文件
  • 3.修改tcp_listeners里面的端口号5672为5673或你想指定的端口

二、Stomp消息代理编码规范

  在rabbitMQ中合法的前缀有:/temp-queue、/exchange、/topic、/queue、/amq/queue、/reply-queue

  • 1./exchange/< exchangeName >
    对于 SUBCRIBE frame,destination 一般为
    /exchange/< exchangeName >/[/pattern] 的形式。交换机要手动创建,该
    destination 会创建一个唯一的、自动删除的、名为< exchangeName >的 queue,
    并根据 pattern 将该 queue 绑定到所给的 exchange,实现对该队列的消息订
    阅。
    对于 SEND frame,destination 一般为
    /exchange/< exchangeName >/[/routingKey] 的形式。这种情况下消息就会被发
    送到定义的 exchange 中,并且指定了 routingKey。

  • 2./queue/< queueName >
    对于 SUBCRIBE frame,destination 会定义< queueName >的共享 queue,并且实
    现对该队列的消息订阅。
    对于 SEND frame,destination 只会在第一次发送消息的时候会定义
    < queueName >的共享 queue。该消息会被发送到默认的 exchange 中,routingKey
    即为< queueName >。

  • 3./amq/queue/< queueName >
    这种情况下无论是 SUBCRIBE frame 还是 SEND frame 都不会产生 queue。
    但如果该 queue 不存在,SUBCRIBE frame 会报错。
    对于 SUBCRIBE frame,destination 会实现对队列< queueName >的消息订阅。
    对于 SEND frame,消息会通过默认的 exhcange 直接被发送到队列
    < queueName >中。

  • 4./topic/< topicName >
    对于 SUBCRIBE frame,destination 创建出自动删除的、非持久的 queue 并根
    据 routingkey 为< topicName >绑定到 amq.topic exchange 上,同时实现对该
    queue 的订阅。
    对于 SEND frame,消息会被发送到 amq.topic exchange 中,routingKey 为
    < topicName >。

三、代码实现

  • pom.xml
<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.9.RELEASE</version>
</dependency>

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.42.Final</version>
</dependency>

<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>0.8.11.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
  • WebSocketConfig.java
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 客户端连接点
        registry.addEndpoint("/websocket")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // registry.enableSimpleBroker("/topic/", "/queue/");
        registry.enableStompBrokerRelay("/topic/", "/queue/", "/exchange")
                .setRelayHost("localhost")
                .setRelayPort(61613)
                .setClientLogin("guest")
                .setClientPasscode("guest")
                .setVirtualHost("/");
        registry.setUserDestinationPrefix("/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

  • WebSocketController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;

@RestController
public class WebSocketController {

    @Autowired
    private SimpMessagingTemplate template;

    /**
     * /exchange/<exchangeName>
     * 交换机模式
     * @param params
     */
//    @MessageMapping("/sendToUser")
//    public void sendToUserByExchange(Map<String, String> params) {
//        String fromUserId = params.get("fromUserId");
//        String toUserId = params.get("toUserId");
//        String msg = "来自" + fromUserId + "消息:" + params.get("msg");
//        String destination = "/exchange/sendToUser/user" + toUserId;
//
//        template.convertAndSend(destination, msg);
//    }

    /**
     * /queue/<queueName>
     * @param params
     */
//    @MessageMapping("/sendToUser")
//    public void sendToUserByQueue(Map<String, String> params) {
//        String fromUserId = params.get("fromUserId");
//        String toUserId = params.get("toUserId");
//        String msg = "来自" + fromUserId + "消息:" + params.get("msg");
//        String destination = "/queue/user" + toUserId;
//
//        template.convertAndSend(destination, msg);
//    }

    /**
     * /topic/<topicName>
     * @param params
     */
    @MessageMapping("/sendToUser")
    public void sendToUserByTopic(Map<String, String> params) {
        String fromUserId = params.get("fromUserId");
        String toUserId = params.get("toUserId");
        String msg = "来自" + fromUserId + "消息:" + params.get("msg");
        String destination = "/topic/user" + toUserId;

        template.convertAndSend(destination, msg);
    }

    @MessageMapping("/sendToAll")
    public void sendToAll(String message) {
        String msg = "消息:" + message;
        String destination = "/topic/chat";

        template.convertAndSend(destination, msg);
    }

}

  • show.html
<!DOCTYPE html>
<html>

<head>
<meta charset="UTF-8">
<title>Floor View</title>
<script src="/js/websocket.js"></script>
<script src="/js/jquery.min.js"></script>
<script src="/js/sockjs.min.js"></script>
<script src="/js/stomp.min.js"></script>
<script id="code">
    var DEBUG_FLAG = true;
    $(function()
    {
        //启动websocket
        connect();
    });

    function sendToAll() {
      var msg = $("#msg").val();
      stompClient.send("/app/sendToAll", {}, msg);
    }

    function sendToUser() {
      var msg = $("#msg").val();
      var toUserId = $("#userId").val();
      var data  = {"fromUserId": userId, "toUserId": toUserId, "msg": msg};
      stompClient.send("/app/sendToUser", {}, JSON.stringify(data));
    }
</script>
</head>

<body style="margin: 0px;padding: 0px;overflow: hidden; ">
  <!-- 显示消息-->
  <textarea id="debuggerInfo" style="width:100%;height:200px;"></textarea>
  <!-- 发送消息-->
  <div>用户:<input type="text" id="userId"></input></div>
  <div>消息:<input type="text" id="msg"></input></div>
  <div><input type="button" value="点对点发送消息" onclick="sendToUser()"></input></div>
  <div><input type="button" value="群发送消息" onclick="sendToAll()"></input></div>
</body>
</html>
  • websocket.js
var stompClient = null;
var wsCreateHandler = null;
var userId = null;

function connect() {
	var host = window.location.host; // 带有端口号
	userId =  GetQueryString("userId");
	var socket = new SockJS("http://" + host + "/websocket");
	stompClient = Stomp.over(socket);
	stompClient.connect({}, function (frame) {
			writeToScreen("connected: " + frame);

			/**
			 * /exchange/<exchangeName>:交换机模式
			 */
			// stompClient.subscribe('/exchange/sendToUser/user' + userId, function (response) {
			// 	writeToScreen(response.body);
			// });

			/**
			 * /queue/<queueName>
			 */
			// stompClient.subscribe('/queue/user' + userId, function (response) {
			// 	writeToScreen(response.body);
			// });

			/**
			 * /topic/<topicName>
			 */
			stompClient.subscribe('/topic/user' + userId, function (response) {
				writeToScreen(response.body);
			});

			stompClient.subscribe('/topic/chat', function (response) {
				writeToScreen(response.body);
			});

		}, function (error) {
			wsCreateHandler && clearTimeout(wsCreateHandler);
			wsCreateHandler = setTimeout(function () {
				console.log("重连...");
				connect();
				console.log("重连完成");
			}, 1000);
		}
	)
}

function disconnect() {
	if (stompClient != null) {
		stompClient.disconnect();
	}
	writeToScreen("disconnected");
}

function writeToScreen(message) {
	if(DEBUG_FLAG)
	{
		$("#debuggerInfo").val($("#debuggerInfo").val() + "\n" + message);
	}
}

function GetQueryString(name) {
	var reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
	var r = window.location.search.substr(1).match(reg); //获取url中"?"符后的字符串并正则匹配
	var context = "";
	if (r != null)
		context = r[2];
	reg = null;
	r = null;
	return context == null || context == "" || context == "undefined" ? "" : context;
}

四、测试

  开启两个页面

  • http://localhost:8080/show.html?userId=1
  • http://localhost:8080/show.html?userId=2

本文链接: http://www.dtmao.cc/news_show_50207.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?