logo头像

👨‍💻冷锋のIT小屋

使用Springboot开发websocket程序(四)——使用RabbitMQ作为STOMP消息代理

上一篇,我们在介绍了Spring中如何使用websocket的子协议stomp,并使用简单的基于内存的stomp消息代理来编写了一个web聊天室实例。基于内存的stomp消息代理,虽然能够满足基本需求,但还是存在一些不足,比如由于stomp代理在应用内部,多个外部websocket应用需要消息互通,那么就难以满足了。在本篇,我们来学习如何使用RabbitMQ作为stomp代理。

1. 为何要使用外部消息代理

简单消息代理,能够满足单websocket应用的需要,但是如果有多个websocket应用,他们之间需要进行消息共享,那么就需要做大量的工作才能实现了。其实,MQ一个最重要的作用就在于能个在各个系统间解耦。引入外部MQ作为stomp消息代理,很好的解决了多系统消息共享的问题,只要其支持stomp协议。RabbitMQ本身提供了对STOMP的支持,加上后结构变化如下:

456cbe0c6ec64b8cbeba7e2171e141f2

前边的是单应用时的结构,后边为怎么了RabbitMQ过后,多个应用程序结构。

2. RabbitMQ对STOMP的支持

RabbitMQ对stomp协议的支持是通过插件的方式,默认stomp插件是关闭的,我们需要先启用之。

2.1. 启用插件

进入rabbitmq所在服务器,然后控制台输入如下命令来启用stomp插件:

``rabbitmq-plugins enable rabbitmq_stomp``

然后可以查看插件是否启用成功:

``rabbitmq-plugins list``

2.2. 插件配置

默认情况下,STOMP将会监听61613端口,默认的用户名和密码都为guest。通过配置文件来配置: ubuntu下rabbitmq的配置文件在/etc/rabbitmq/rabbitmq.conf,找到stomp开头的选项,就可以进行配置了

b476e99c6fbd4d0cbe1509478832e955

比如配置STOMP监听端口:

``stomp.listeners.tcp.1 = 12345``

RabbitMQ中STOMP适配器连接时如果用户名和密码使用默认的guest/guest,则可以忽略,如果需要修改,则配置如下:

1
2
stomp.default_user = guest
stomp.default_pass = guest

2.3. Destinations

STOMP规范并没有规定消息代理来支持什么样的目的地(destination),只是根据消息头的destination的值来判断消息发送的目的地,一般由消息代理自定义支持,RabbitMQ中定义了几种destination类型:

  • #exchange[/exchange]: 发送到任意的routing key和订阅任意的binding key

  • #queue[/queue]: 发送和订阅队列,该队列由STOMP管理

  • #amqqueue[/amq/queue]: 发送和订阅外部创建的队列

  • #topic[/topic]: 发送和订阅到topic

  • #temptopic[/temp-queue/]: 创建临时的队列(使用reply-to请求头)

现在,我们结合代码来看看Spring中对RabbitMQ的这几类destination是如何支持的。

3. Spring中使用RabbitMQ消息代理

我们通过一个demo来看看如何在Spring中使用RabbitMQ支持的这几个destination,整体界面如下;

4e618f6c32204829ac1b9b8c8513ddc4

下边的示例仅贴上部分关键代码,完整的代码可以参看文末的源码。

首先,我们创建一个名为03-websocket-stomp-rabbitmq的springboot工程,引入如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-net</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.33.Final</version>
</dependency>
<dependencies>

这里需要引入reactor库,注意版本的对应,启用响应式编程的支持,否则会出现如下错误:

1
2
3
4
5
6
Caused by: java.lang.ClassNotFoundException: reactor.io.codec.Codec
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_171]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_171]
... 29 common frames omitted

Spring启用Stomp外部消息代理很简单,配置类跟上一篇大致相同,注册stomp端点:

1
2
3
4
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").withSockJS();
}

然后,把简单stomp代理换成外部stomp代理,只需要修改一下配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry
// .setPathMatcher(new AntPathMatcher("."))
.setApplicationDestinationPrefixes("/app");
registry.enableStompBrokerRelay("/exchange", "/topic", "/queue", "/amq/queue")
.setRelayHost("192.168.0.27")
.setRelayPort(61613)
// 配置发送消息到stomp代理的系统共享连接的账号密码,默认是guest/guest
.setSystemLogin("admin")
.setSystemPasscode("123456")
// 配置客户端连接到stomp代理的账号和密码,默认是guest/guest
.setClientLogin("admin")
.setClientPasscode("123456")
;
}

我们这里仅测试/exchange/topic/queue/amq/queue几个destination。

首先,在配置中调用registry.enableStompBrokerRelay(……)方法来配置支持的代理消息destination的前缀,这些前缀由外部消息代理自定义,不同的外部MQ都有自己对STOMP特殊的支持,比如RabbitMQ,它就支持开篇说的这几种destination前缀。

然后,我们分别配置了服务端和客户端的管理账号和密码,其实就是RabbitMQ可用来管理的账号和密码,例如创建队列、交换机等。

工程准备好,我们再看看rabbitmq的这几种stomp destination支持。

3.1. Exchange Destinations

这种是面向交换机的,其destination的格式为:/exchange/<name>[/<pattern>]

这种destination会为每一个订阅者创建一个新的队列并使用给定的<pattern>作为routing key来绑定到exchange,但是stomp不会自动创建exchange,需要在外部自己创建。

  • 订阅者:

    • 创建一个独占的、自动删除的队列,绑定到名称为<name>的交换机,如果<pattern>有值,则使用其作为key绑定到exchange

    • 订阅创建的queue

  • 发送者:使用<routing-key>作为路由key发送消息到名为<name>的交换机

我们先回顾一下RabbitMQ中exchange的常用几种类型:

98af8e2fb72b4f7bbf9b4e26feceac20
  • direct:消息发送者的routing key和绑定到exchange的queue的routing key必须精确匹配,意思就是必须一致

  • topic:可以支持通配符匹配,表示匹配所有,表示匹配一个单词,如lazy.、*.orange.*

  • fanout:对所有绑定的队列进行广播

<p class="x-box-title x-padding-top-10">如果对rabbitmq还不了解的,可以看这几篇:

ok,现在我们来模拟一个场景:有两个客户端,分别订阅了stomp的/exchange/<name>/animal.#和/exchange/<name>/animal.* 两个destination,客户端分别发送消息到服务端,然后服务端转发到具体的destination上,分别来测试客户端收到的消息情况。

我们来看看该场景的测试代码(省略基础代码,见文末源码)。

首先,在RabbitMQ中创建一个topic类型的exchange,其他类型的同理。

然后,在websocket服务端,我们定义如下两个消息处理方法:

1
2
3
4
5
6
7
8
9
10
@MessageMapping("/send2mifei")
@SendTo("/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.rabbit.mifei")
public String exchange1(String content) {
return "destination : " + "/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.rabbit.mifei" + ", content : " + content;
}
@MessageMapping("/send2peppa")
@SendTo("/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.pig")
public String exchange2(String content) {
return "destination : " + "/exchange/" + EXCHANGE_TOPIC_NAME + "/animal.pig" + ", content : " + content;
}

exchange1exchange2方法用来接收客户端发出来的消息,并转发到具体的destination上,这里为animal.rabbit.mifeianimal.pig,正常情况下,订阅了animal.#的客户端都能收到消息,而订阅了animal.*的客户端只能收到exchange2转发的消息。

客户端关键的代码如下:

订阅地址设置:

1
2
3
4
5
6
7
8
9
10
11
<label>订阅地址:</label>
<select id="subscribe-uri">
<option></option>
<option value="/exchange/ws.rabbit.exchange.topic/animal.#">
/exchange/ws.rabbit.exchange.topic/animal.#
</option>
<option value="/exchange/ws.rabbit.exchange.topic/animal.*">
/exchange/ws.rabbit.exchange.topic/animal.*
</option>
</select>
<button id="subscribe">订阅</button>

订阅和消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
div.find('#subscribe').click(function () {
let su = div.find('#subscribe-uri').val();
if (su) {
showMessage('已经订阅:' + su);
// 订阅聊天内容
ws.subscribe(su, function (data) {
showMessage(data.body);
});
}
});

div.find('#send1').click(function () {
if (div.find('#content').val()) {
if (type === 1) {
ws.send('/app/send2mifei', {}, div.find('#content').val());
} else if (type === 4) {
ws.send('/app/topic/debug', {}, div.find('#content').val());
}
}
});
div.find('#send2').click(function () {
if (div.find('#content').val()) {
if (type === 1) {
ws.send('/app/send2peppa', {}, div.find('#content').val());
} else if (type === 4) {
ws.send('/app/topic/info', {}, div.find('#content').val());
}
}
});

测试结果跟我们预期相同。

db3da4b485fc42d1b9cca503aa3a6863

一句话概括:/exchange类型的destination,外部必须先有exchange,stomp不会创建,在客户端订阅时,stomp都会为之创建一个临时的、自动删除的队列,并根据routing key绑定到exchange上。

3.2. Queue Destinations

面向队列,消息发送到默认的交换机(名称为"")。

格式:/queue/<name>

Queue destinations会发送消息给至少一个订阅者,如果没有订阅者,那么消息会一直排队等待订阅者消费

  • 订阅者:订阅时如果队列不存在则创建名为<name>的共享队列,并订阅,队列默认是持久化、非独占、非自动删除的,多个订阅者会轮流接收消息

  • 发送者:发送消息时如果队列不存在则创建,消息会通过默认的exchange发送到共享队列

来看看测试代码。

服务端:

1
2
3
4
5
@MessageMapping("/queue")
@SendTo("/queue/" + QUEUE_NAME)
public String queue(String content) {
return "destination : " + "/queue/" + QUEUE_NAME + ", content : " + content;
}

客户端:

启动多个客户端,它们都订阅/queue/<name>,然后发送任意消息到服务端。可以看到,客户端之间轮流接收消息。

一句话概括:/queue destination必须发送消息到一个客户端,没有客户端订阅则消息排队,消息发送者、订阅者都可以创建持久化的、非自动删除的队列,只要它不存在就创建,存在则用之,多个客户端轮流接收消息。

3.3. AMQ Queue Destinations

这种destination跟/queue相似,唯一的不同是,stomp订阅者和发送者都不负责创建队列,没有队列则出错,主要用于发送和订阅已经存在的队列.

格式:/amq/queue/<name>

  • 发送者:消息通过默认的exchange发送到队列,

  • 订阅者:通过STOMP创建对该队列的订阅,多个订阅者会轮流接收消息

现在rabbitmq创建一个队列。

服务端:

1
2
3
4
5
@MessageMapping("/amq/queue")
@SendTo("/amq/queue/" + AMQ_QUEUE_NAME)
public String amqQueue(String content) {
return "destination : " + "/amq/queue/" + AMQ_QUEUE_NAME + ", content : " + content;
}

客户端订阅/amq/queue/<name>。

启动多个客户端,发送任意消息,消息同样是轮流发送给各个客户端的。如果队列不存在,后台出现错误信息:

2019-11-12 14:36:52.859 ERROR 40283 --- [eactor-tcp-io-3] o.s.m.s.s.StompBrokerRelayMessageHandler : Received ERROR {message=[not_found], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[68]} session=m0dxmmi0 text/plain payload=NOT_FOUND - no queue 'ws.rabbit.amq.queue.destination' in vhost '/'

一句话概括:/amq/queue用来处理队列已经存在的情况,必须发送消息到一个客户端,没有客户端订阅则消息排队,消息发送者、订阅者不会创建队列,队列不存在则报错,消息同样是轮流发给各个客户端

3.4. Topic Destinations

STOMP最常用的destination类型,发送者和订阅者可以通过routing key进行匹配。如果发出的消息没有订阅者,消息会被丢弃。

格式:/topic/<name>,<name>为绑定的routing key

  • 发送者:消息通过<name>的routing key发送到rabbitmq的amq.topic交换机

  • 订阅者:创建并订阅自动删除的、非独占的队列,该队列按照<name>的routing key绑定到amq.topic交换机上

默认的amq.topic交换机可以通过配置更改为自定义名称:

``stomp.default_topic_exchange = some.exchange``

来看测试场景:客户端订阅不同的destination,后台接收消息并转发到不同的地址上,测试消息接收情况。

服务端:

1
2
3
4
5
6
7
8
9
10
@MessageMapping("/topic/debug")
@SendTo("/topic/" + "*.debug.*")
public String topicTest1(String content) {
return "destination : " + "/topic *.debug.*, content : " + content;
}
@MessageMapping("/topic/info")
@SendTo("/topic/" + "*.info.*")
public String topicTest2(String content) {
return "destination : " + "/topic *.info.*, content : " + content;
}

客户端自主选择订阅/topic/.debug./topic/.info.

启动多个客户端,然后发送信息到topicTest1和topicTest2,订阅了/topic/.debug.能收到topicTest1转发d的消息,而另外一个能收到topicTest2发送的消息。

/topic自动为每个订阅者创建临时队列,如果消息没有订阅者接收则自动丢弃。

ae89695a76454254ad8149285274d310

topic应用最广泛,可以根据订阅者和消息发送者的routing key进行匹配。另外,topic也支持持久化的订阅,客户端断开连接后消息不会丢失,不过需要通过stomp请求头增加头信息来实现,这里就不做介绍了。

3.5. Temp Queue Destinations

临时队列目标允许在SEND frame的reply-to请求头来定义临时destination。临时队列受消息代理管理,他们通过各自的session来唯一区分,所以队列名称可以相同。

要是用临时队列,只需在SEND frame时设置reply-to请求头,规定该值必须以/temp-queue开头:

1
2
3
4
5
SEND
destination:/queue/reply-test
reply-to:/temp-queue/foo

Hello World!

上边的frame会创建一个临时队列(名称自动生成),然后session私有并自动订阅,不同的session会创建不同的队列。

临时队列使用较少,这里也不做深入研究了。

4. 总结

基于内存的stomp消息代理能够满足单应用需求,引入外部stomp消息代理解决了多应用之间的websocket消息传递需求。不同的消息中间件,都会按照stomp规范定义自身的destination支持。开发者需要明白两个点:订阅者订阅了什么destination,发送者发送到什么destination,这两个地址的匹配规则是什么。

源码地址: 见GITHUB

支付宝打赏 微信打赏

赞赏是不耍流氓的鼓励