logo头像

👨‍💻冷锋のIT小屋

Spring AMQP注解的使用

上一篇 Spring AMQP简介和使用,我们介绍了Spring AMQP的一些基本要素和概念,也通过一些示例代码介绍了消息的发送和接收,但是都是使用的原始编码方式来实现,并不依赖Spring环境。其实,Spring AMQP也支持使用注解的方式来进行异步接收消息,极大的简化了编码。

1. hello world

要使用注解,首先需要在Spring应用环境中,我们看一个最简单的demo:

1、重新新建一个Spring boot工程,添加如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>

2、新建一个Spring配置类RabbitConfiguration,用来申明Bean:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class RabbitConfiguration {
public static final String ANONYMOUS_QUEUE_NAME = "spring.amqp.anonymous.queue";

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
cachingConnectionFactory.setUsername("admin");
cachingConnectionFactory.setPassword("123456");
return cachingConnectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}

@Bean
public Queue anonymousQueue() {
// 匿名队列
return new AnonymousQueue(() -> ANONYMOUS_QUEUE_NAME);
}
}

配置类中,创建了ConnectionFactoryAmqpAdminAnonymousQueueRabbitTemplate等Bean,Spring容器启动后就可以使用它们了。

3、创建测试类HelloWorldDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class HelloWorldDemo {
@Resource
private RabbitTemplate rabbitTemplate;

public void send(String msg) {
rabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME, msg);
System.err.println("Send : " + msg);
}

@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME)
public void receive(String msg) {
System.err.println("Received : " + msg);
}
}

重点来了,首先该类被标注了@Component注解,表示其受Spring容器管理;其次,通过@Resource注解注入了rabbitTemplate Bean,并使用它来发送String类型的消息;第三,在消息接收的方法receive上标注了@RabbitListener注解,稍后再来看这个注解。

4、创建Spring boot工程启动类SpringAmqpApplication

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootApplication
@EnableRabbit
public class SpringAmqpApplication {

public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(SpringAmqpApplication.class, args);

HelloWorldDemo helloWorldDemo = context.getBean(HelloWorldDemo.class);
helloWorldDemo.send("hello world!");
helloWorldDemo.send("hi, belonk!");
helloWorldDemo.send("张三");
}
}

启动应用,可以看到成功接收了消息。

要启动注解的消息监听,需要在配置类上加上@EnableRabbit注解。

2. RabbitListener

通过一个例子来了解一下@RabbitListener的作用:

1
2
3
4
5
6
7
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}

上边的实例使用了 @RabbitListener注解来监听名为“myQueue”的队列,只要该队列有消息可用,则会交给 processOrder方法处理,但是要确保该队列存在并绑定到了exchange上。使用该注解标记的方法或类,都会使用 RabbitListenerContainerFactory为其创建一个容器,上篇提过,异步消息监听的容器 MessageListenerContainer是由改类创建的。可见,Spring AMQP的注解消息监听是采用异步的方式, @RabbitListener注解是由 RabbitListenerAnnotationBeanPostProcessor处理的。

@RabbitListener注解标记的方法可以支持集中类型的参数:

  • com.rabbitmq.client.Channel:访问Channel

  • org.springframework.amqp.core.Message:接收的消息对象

  • 实体对象:消息中对应的负载的实体对象,自动推导负载实体

  • org.springframework.messaging.Message:Spring-messaging中的消息对象

  • @Payload:标记在消息负载实体上,明确指定消息的负载对象

  • @Header:获取特定一个消息头内容

  • @Headers:标注在一个Map上,用来获取所有消息头

  • MessageHeaders:spring-messaging的消息头

还有几个就不在一一列举了,我们看一个获取Channel和header的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void send(String msg) { (1)
rabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_1, (Object) msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("custom_header", "this is a custom header.");
return message;
}
});
System.err.println("Send : " + msg);
}

@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_1)
public void receive(String msg, Channel channel, @Header("custom_header") String header) { (2)
System.err.println("Received : " + msg);
System.err.println("Header : " + header);
System.err.println("Channel : " + channel.getChannelNumber());
}
1 send方法设置了一个名为custom_header的自定义消息头
2 receive方法通过@Header来获取,并添加Channel对象。

再看一个@Payload标注的例子:

1
2
3
4
5
6
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_2)
public void receive(@Payload User user, Channel channel, @Header("custom_header") String header) { (1)
System.err.println("Received : " + user);
System.err.println("Header : " + header);
System.err.println("Channel : " + channel.getChannelNumber());
}
1 这里的User参数可以根据消息自动推导,可以不加上@Payload注解。
注意
User必须实现Serializable接口,上一篇已经提到过,默认的消息转换器是使用的SimpleMessageConverter,它只能处理java序列化对象、Stringbyte[]

定义绑定和Exchange

@RabbitListener有多个属性,可以用来指定监听的队列、绑定关系,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class MyService {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(String data) {
...
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(String data) {
...
}

}

第一个方法,指定了要监听的队列、Exchange以及routing key,queue和exchange会按需自动申明并绑定;第二个方法,将会申明非持久化、独占的、自动删除的匿名队列并绑定到exchange。

2.1. 监听多个队列

使用queues属性时,可以指定关联容器可以侦听多个队列。也可以使用@Header注解来获取接受消息的队列名称:

1
2
3
4
5
6
7
8
@Component
public class MyService {

@RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}

也支持spEL(从1.5开始):

1
2
3
4
5
6
7
8
9
@Component
public class MyService {

@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}

}

3. 消息转换

在调用Listener之前,有两个消息转换步骤:

首先,使用 MessageConverter 将传入的 Spring AMQP 的 Message 转换为 Spring messaing 的 Message,此时,MessageConverter 默认使用的是SimpleMessageConverter 实现,它仅仅处理 byte[] 数组与 Stringjava.io.Serializable 之间的相互转换

其次,在调用目标方法时将消息转换为方法参数的类型。此时,MessageConverter 默认使用的是 GenericMessageConverter 实现,它将转换委托给一个转换服务(DefaultFormattingConversionService的实例)。

设置消息转换器:

1
2
3
4
5
6
7
8
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 使用jacson消息转换器
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}

在1.6版本之前,消息与类型的转换需要指定消息头(type)或者ClassMapper,从1.6开始,如果@RabbitListener用在方法上,那么可以根据方法参数类型进行自动推断。

以下代码示例可以定义自己的消息转换器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

...

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
return factory;
}

@Bean
public ConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

...

}

4. 多方法监听

Spring AMQP 支持同一个监听器调用多个方法,此时@RabbiListener注解标注在类上,多个被调用的方法上标注@RabbitHandler 注解。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RabbitListener(id="multi", queues = "someQueue")
public class MultiListenerBean {
@RabbitHandler
@SendTo("my.reply.queue")
public String bar(Bar bar) {
...
}

@RabbitHandler
public String baz(Baz baz) {
...
}

@RabbitHandler
public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
...
}
}

上边的三个方法都被标注@RabbitHandler注解,表示分别监听消息被转换的负载实体是BarBazQux。需要注意的是,Spring AMQP 必须能够区分根据负载实体来区分不同的方法,即是说,每一个被@RabbitHandler标注的方法必须具有不同的负载类型,要么被@Payload标注出明确的负载实体类型,要么根据参数类型自动推断。

5. 消息回复

如果 ``@RabbitListener``监听的方法返回不为空的值,会根据发送者的消息头的 ``ReplyToAddress``的地址进行消息返回,这是由 ``MessageListenerAdapter``来处理的,如果没有设置,则可以添加 ``@SendTo``注解来定义消息返回的地址。使用 ``@SendTo``注解来表示返回结果需要转换为 ``Message``并发送到指定的回复地址(exchange和routing key)上:
1
2
3
4
5
6
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
……
return status;
}

直接返回Message:

1
2
3
4
5
6
7
8
9
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
……
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}

@SendTo注解的value值用来表示exchange和routing key,格式为:exchange/routingKey,例如:

  • foo/bar - 回复的exchange为foo,routing key为bar

  • foo/ - 回复的exchange为foo,routing key为默认(空的)

  • bar or /bar -回复的outingKey为bar,exchange为默认.

  • / or empty - 回复的exchange和routing key都为默认.

@SendTo也支持spEL:

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}

引用的方法必须返回String。

现在,我们来编写一个消息确认和回复的demo,结合 上一篇的示例来看看基于注解如何工作:

1、修改RabbitConfiguration:

设置消息确认和返回:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("192.168.0.27", 5672);
cachingConnectionFactory.setUsername("admin");
cachingConnectionFactory.setPassword("123456");
// 消息确认
cachingConnectionFactory.setPublisherConfirms(true);
// 消息返回
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}

创建设置了回调的RabbitTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
public RabbitTemplate callbackRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
// 消息返回
rabbitTemplate.setMandatory(true);
// 设置消息返回回调,一个RabbitTemplate只能设置一次返回回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
Printer.p("Message returned : " + replyCode + ", " + replyText);
}
});
// 消息确认回调,一个RabbitTemplate只能设置一次确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
Printer.p("Message confirmed : " + ack + ", " + cause + ", " + correlationData);
}
});
return rabbitTemplate;
}

再申明一个匿名的队列,代码就不贴了。

2、新建一个ConfirmAndReturnDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Component
public class ConfirmAndReturnDemo {
@Resource
private RabbitTemplate callbackRabbitTemplate;

private Sender sender = new Sender();

public void send(User user) {
sender.send(user);
Printer.p("Send : " + user);
}

@Component
public class Sender {
public void send(User msg) {
callbackRabbitTemplate.convertAndSend(RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4, (Object) msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("custom_header", "this is a custom header.");
return message;
}
});
Printer.p(this, "Send : " + msg);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4, durable = "false", autoDelete = "true", exclusive = "true"),
exchange = @Exchange("exist.exchange"), key = "exist.routingKey"
)
})
public void receiveReply(User user) {
Printer.p(this, "Received reply : " + user);
}
}

@Component
class Consumer {
@RabbitListener(queues = RabbitConfiguration.ANONYMOUS_QUEUE_NAME_4)
// 回复到默认的队列
// @SendTo
// 回复到不存在的exchange和routingkey
// @SendTo("dontExist.exchange/dontExist.routingKey")
// 回复到存在的exchange和routingkey
@SendTo("exist.exchange/exist.routingKey")
// 回复到默认的exchange和不存在的routingkey
// @SendTo("dontExist.routingKey")
public User receive(@Payload User user) {
Printer.p(this, "Received : " + user);
user.setName("王五改名字了");
return user;
}
}
}

这里用了两个内部类来创建生产者和消费者,生产者除了发送消息,还将监听回复的消息,只是exchangerouting key不同;消费者接收消息,并修改了Username属性,然后返回,接收方法上标注了@SendTo,指定exchange和routing key,这里可以设置多种情况来验证回调方法的执行情况。

3、启动类编写demo执行代码:

1
2
ConfirmAndReturnDemo confirmAndReturnDemo = context.getBean(ConfirmAndReturnDemo.class);
confirmAndReturnDemo.send(new User("王五"));

4、运行程序,可以看到控制台输出如下:

1
2
3
4
5
[Sender] Send : User(name=王五)
Send : User(name=王五)
Message confirmed : true, null, null
[Consumer] Received : User(name=王五)
[Sender] Received reply : User(name=王五改名字了)

Sender已经成功接收了回复消息。

6. 定义元注解

@RabbitListener可以用在注解上,来自定义元注解信息:

1
2
3
4
5
6
7
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}

可以看到,只需要在自定义注解上适用Spring AMQP 的注解即可,使用时:

1
2
3
4
5
6
7
8
9
10
11
public class MetaListener {
@MyAnonFanoutListener
public void handle1(String foo) {
...
}

@MyAnonFanoutListener
public void handle2(String foo) {
...
}
}

7. 总结

本文的示例代码见 github,总结一下:

1、@RabbitListener是消息异步监听的基本注解,可以定义监听的队列、队列绑定关系

2、@RabbitHandler用在需要监听多个方法时,不同的方法接收不同的消息实体,必须能够明确区分不同的实体,否则消息不能监听成功

3、@SendTo用于设定消息回复,标注的方法需要返回非空的回复实体对象

4、@EanableRabbit用来启用注解消息监听

5、@Header@Headers@Payload等用在方法签名上,用来获取消息头信息或者明确表明消息的负载实体(也可以自动推导)。

支付宝打赏 微信打赏

赞赏是不耍流氓的鼓励