logo头像

👨‍💻冷锋のIT小屋

Spring AMQP消息转换

上一篇,我们介绍了如果使用Spring AMQP注解来实现消息发送和监听,示例都是使用的默认的消息转换器,即SimpleMessageConverter,它只能处理byte[]String、java序列化对象(实现了Serializable接口的对象)。

通常,不推荐使用Java序列化,因为它存在与Java对象强耦合、依赖java语言等缺点,Spring AMQP也提供了其他的消息转换方式,在本篇,我们将重点来看看如果将消息序列化为JSON格式。

1. MessageConverter

Spring AMQP消息转换定义了顶层接口MessageConverter,它的定义如下:

1
2
3
4
5
6
7
8
public interface MessageConverter {
// 将对象转换为Message对象,支持自定义消息属性
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException
;

// 将Message转换为对象
Object fromMessage(Message message) throws MessageConversionException;
}

它定义了两个方法:将对象转换为Message,将Message转换为对象。

同时,在AmqpTemplate中定义了便捷的消息转换和发送的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException
;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException
;

void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException
;

void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor)
throws AmqpException
;

同样,还定义消息接收并转换为对象的方法:

1
2
3
Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

这些方法并不直接处理封装的Message对象,而是根据设定的MessageConverter来处理POJO,例如发送时将POJO转换为Message再发送,又或者接收Message并将body转换为POJO,当然,它还会设置合适的MessageProperties

MessageConverter有几种实现,我们来简单看一下。

2. SimpleMessageConverter

这是MessageConverter的默认实现,它只能处理byte[]String、java序列化对象(实现了Serializable接口的对象)。

2.1. Message转换为POJO

消息底层存储的都是字节数组,从字节数组转换为Java对象有以下几种情况:

  • 字符串:消息的content-type为text开头,则转换为字符串,默认使用的字符串编码为UTF-8,可以通过defaultCharset指定

  • Java序列化:消息的content-typeapplication/x-java-serialized-object,则SimpleMessageConverter尝试将字节数组反序列化为Java对象。虽然这对于简单的原型开发可能很有用,但是通常不建议依赖Java序列化,因为它会导致生产者和消费者之间的紧密耦合

2.2. POJO转换为Message

同样,SimpleMessageConverter将对象转换为Message时,本质上是将其转换为字节数组,支持字符串、Java可序列化对象、字节数组,除此之外的数据类型Message的body将被设置为null。

3. SerializerMessageConverter

这个转换器类似于SimpleMessageConverter,但它依赖spring框架的SerializerDeserializer接口,而不是java的Serializable接口。

4. Jackson2JsonMessageConverter

文章开头提过,通常不推荐使用Java序列化,因为它存在与Java对象强耦合、依赖java语言等缺点,一种替代方案是使用JSON序列化,Spring AMQP提供了Jackson2JsonMessageConverter转换器。

4.1. POJO转为Message

由于RabbitTemplate默认使用的SimpleMessageConverter,所以我们需要将其替换为Jackson2JsonMessasgeConverter,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public RabbitTemplate jsonRabbitTemplate() {
RabbitTemplate jsonRabbitTemplate = new RabbitTemplate(connectionFactory());
jsonRabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return jsonRabbitTemplate;
}

@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return new Jackson2JsonMessageConverter();
}

Jackson2JsonMessasgeConverter会根据接收消息的POJO类型来进行自动推断转换类型,同时,也支持手动指定转换类型,只需为其设置一个名为ClassMapper的实体对象,用来明确标记JSON转换的对象,它的定义如下:

1
2
3
4
5
6
7
public interface ClassMapper {
// 将给定的clazz放入消息头
void fromClass(Class<?> clazz, MessageProperties properties);

// 根据消息头获取转换目标class
Class<?> toClass(MessageProperties properties);
}

它是一个策略接口,表明根据MessageProperties中的特定消息头来获取转换的Class的策略,默认的策略是DefaultClassMapper,它的特定消息为TypeId,一个设定ClassMapper的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 申明json转换器,及其转换类型,默认为SimpleMessageConverter
Jackson2JsonMessageConverter messageConverter = (Jackson2JsonMessageConverter) messageConverter();
messageConverter.setClassMapper(classMapper());
template.setMessageConverter(messageConverter);
return template;
}

@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("foo", Foo.class);
idClassMapping.put("bar", Bar.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}

发送消息时需要为MessageProperties指定type属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void runDemo() throws Exception {
String json = "{\"foo\" : \"value\" }";
Message jsonMsg = MessageBuilder.withBody(json.getBytes(Charset.forName("utf-8")))
.andProperties(MessagePropertiesBuilder.newInstance().setContentType("application/json").build())
.build();
jsonMsg.getMessageProperties().setHeader(DefaultClassMapper.DEFAULT_CLASSID_FIELD_NAME, "foo");
this.jsonRabbitTemplate.send(JSON_MESSAGE_QUEUE, jsonMsg);
}

@RabbitListener(queues = JSON_MESSAGE_QUEUE)
public void listenForJsonMessage(Foo foo) {
System.out.println("listenForJsonMessage : Expected a Foo, got a " + foo);
}

4.2. Message转为POJO

Spring AMQP根据消息的头信息来转换POJO对象,这需要消息的属性MessagePropertiesContent-Type中指定了JSON的格式,例如application/json等。1.6之前的版本,如果头信息中的type信息(一般为type)缺失,将会转换失败;从1.6版本开始,将采用jackson默认的转换行为(一般来说,是将json转换为map),并且,将会根据@RabbitListener标注的方法参数类型来进行自动推断并转换。默认情况下,推断的类型会覆盖请求头设定的(根据type)类型。

还有一些其他的转换器:

MarshallingMessageConverter:根据Spring OXM的MarshallerUnmarshaller接口来转换消息,代理Spring OXM的实现;

ContentTypeDelegatingMessageConverter:根据MessageProperties的contentType代理不同的转换器。

5. JSON转换示例

接下来,我们来编写一个基于注解的消息转换示例:

1、根据上篇的示例工程,在RabbitConfiguration添加jsonRabbitTemplate等设置:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public RabbitTemplate jsonRabbitTemplate() {
RabbitTemplate jsonRabbitTemplate = new RabbitTemplate(connectionFactory());
jsonRabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return jsonRabbitTemplate;
}

@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return new Jackson2JsonMessageConverter();
}

这里申明了一个名为jsonRabbitTemplate的Bean和jackson2JsonMessageConverter的转换器,并没有为json转换器设置ClassMapper

2、编写JsonMessageDemo测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_5)
public class JsonMessageDemo {

@Resource
private RabbitTemplate jsonRabbitTemplate;

public void send(Object obj) {
Printer.p(this, "Send : " + obj);
this.jsonRabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_5, obj);
}

@RabbitHandler
public void receive(String json, @Header("contentType") String header) {
Printer.p(this, "content-type: " + header);
Printer.p(this, "Received : " + json);
}

@RabbitHandler
public void receive(Dept dept, @Header("contentType") String header) {
Printer.p(this, "content-type: " + header);
Printer.p(this, "Received : " + dept);
}

@RabbitHandler
public void receive(User user, @Header("contentType") String header) {
Printer.p(this, "content-type: " + header);
Printer.p(this, "Received : " + user);
}
}

这里输出了消息的ConentType,便于验证是否是json格式的消息。

3、启动类Main方法添加测试代码:

1
2
3
4
5
6
// demo6
JsonMessageDemo jsonMessageDemo = context.getBean(JsonMessageDemo.class);
User user = new User("赵六");
jsonMessageDemo.send(user);
jsonMessageDemo.send(new Dept("技术部", user));
jsonMessageDemo.send("this is not json");

由于这里声明了Json转换器,之前的测试代码不能运行,所以需要先注释掉,运行之前的示例只需要注释掉json转换器申明的代码即可。

4、运行main方法,可以看到如下输出:

1
2
3
4
5
6
7
8
9
[JsonMessageDemo] Send : User(name=赵六)
[JsonMessageDemo] Send : Dept(name=技术部, manager=User(name=赵六))
[JsonMessageDemo] Send : this is not json
[JsonMessageDemo] content-type: application/json
[JsonMessageDemo] Received : User(name=赵六)
[JsonMessageDemo] content-type: application/json
[JsonMessageDemo] Received : Dept(name=技术部, manager=User(name=赵六))
[JsonMessageDemo] content-type: application/json
[JsonMessageDemo] Received : this is not json

消息的ContentType都是application/json,并且成功发送,也成功接收。

我们知道(说明在 这里),@RabbitListener注解标注的方法,会使用RabbitListenerContainerFactory为其创建一个容器MessageListenerContainer,容器通过设定的MessageListener来进行异步监听,默认的容器是SimpleMessageListenerContainer,要查看原始的消息内容,我们可以跟踪其executeListener方法,在其父类AbstractMessageListenerContainer中,获取的一个消息如下:

(Body:'{"name":"赵六"}' MessageProperties [headers={TypeId=com.belonk.domain.User}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=spring.amqp.anonymous.queue5, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-ww7B814EdZCwae0T6bWw6g, consumerQueue=spring.amqp.anonymous.queue5])

可以看到,它的body是json字符串,并且自动设置了TypeId消息头为User对象。

6. 总结

本篇先简单介绍到这里,总结一下:

1、Spring AMQP支持多种消息转换方式,最常用的是默认的和JSON,要设置消息转换,需要为RabbitTemplate配置消息转换器;

2、基于注解使用JSON消息转换器,可以根据参数进行转换类型推导,而无需设置消息头的ContentTypeTypeId

3、JSON消息转换器还支持配置ClassMapper来定义转换类型。

本文实例代码见 github

支付宝打赏 微信打赏

赞赏是不耍流氓的鼓励