概述

RabbitMQ 是一个由 Erlang 语言开发的 AMQP (Advanced Message Queue) 高级消息队列协议 的开源实现。

名词 概念
Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Connection 网络连接,比如一个TCP连接。
Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Broker 表示消息队列服务器实体。

准备工作

安装

Windows安装

安装RabbitMQ之前必须安装 Erlang 运行环境。

  • 前往Erlang官网下载 Windows 64-bit Binary File版本,一直下一步
  • 配置环境变量 ERLANG_HOME 为安装目录 D:\esl24.0
  • PATH环境变量中添加 %ERLANG_HOME%\bin

安装RabbitMQ

  • 前往RabbitMQ官网下载 .exe安装包,一直下一步
  • 配置环境变量RABBITQM_SERVER 为安装目录 D:\rabbitmq_server-3.9.4
  • PATH环境变量中添加 %RABBITQM_SERVER%\sbin
  • 安装rabbitmq_management插件 rabbitmq-plugins enable rabbitmq_management
  • 启动,并访问 http://localhost:15672
  • 默认账号密码都是guest

docker安装

1
docker pull rabbitmq
1
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

设置默认用户名和密码可以使用下面的方式

1
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

Java中使用

JavaClient

这里只是写了些基本操作的测试代码,详解请参考Java客户端接口(API)指南

单生产单消费【默认交换机】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.coderxi.util.AMQPUtil;
import com.rabbitmq.client.DefaultConsumer;
import org.junit.jupiter.api.Test;

import java.util.Scanner;

class Test1Simple {

String queue = "simple_queue";

@Test
void createQueue() {
AMQPUtil.test(channel -> {
//创建一个队列(不指定交换机)
channel.queueDeclare(queue, true, false, false, null);
});
}

@Test
void publish() {
AMQPUtil.test(channel -> {
//向队列发布消息(不指定交换机)
String message = "Hello,world!";
channel.basicPublish("", queue, null, message.getBytes());
});
}

@Test
void consume() {
AMQPUtil.test(channel -> {
//创建一个消费者
DefaultConsumer consumer = AMQPUtil.newConsumer("消费者",channel);
//消费
channel.basicConsume(queue, true, consumer);
//阻塞,防止进程结束
System.in.read();
});
}
}

单生产多消费(轮巡消费)【默认交换机】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.coderxi.util.AMQPUtil;
import com.rabbitmq.client.DefaultConsumer;
import org.junit.jupiter.api.Test;

import java.util.Scanner;

public class Test2Work {

String queue = "work_queue";

@Test
void createQueue() {
AMQPUtil.test(channel -> {
//创建一个队列(不指定交换机)
channel.queueDeclare(queue, true, false, false, null);
});
}

@Test
void publish() {
AMQPUtil.test(channel -> {
//向队列发布消息(不指定交换机)
String message = "Hello,world!";
channel.basicPublish("", queue, null, message.getBytes());
});
}

@Test
void consume() {
AMQPUtil.test(channel -> {
//创建三个消费者
//会按轮巡形式消费,每次发布消息只有一个消费者消费
for (int i = 1; i <= 3; i++) {
DefaultConsumer consumer = AMQPUtil.newConsumer("消费者" + i, channel);
channel.basicConsume(queue, true, consumer);
}
System.in.read();
});
}
}

发布/订阅模式【FANOUT交换机】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.coderxi.util.AMQPUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.DefaultConsumer;
import org.junit.jupiter.api.Test;

import java.util.Scanner;

public class Test3PublishAndSubscribe {

String
exchange = "subscribe_exchange",
queue1 = "subscribe_queue_1",
queue2 = "subscribe_queue_2";

@Test
void createQueue() {
AMQPUtil.test(channel -> {
//创建一个交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
//创建两个队列
channel.queueDeclare(queue1, true, false, false, null);
channel.queueDeclare(queue2, true, false, false, null);
//将两个队列绑定到这个交换机(不指定routingKey)
channel.queueBind(queue1,exchange,"");
channel.queueBind(queue2,exchange,"");
});
}

@Test
void publish() {
//向交换机(绑定的所有队列)发布消息
AMQPUtil.test(channel -> {
String message = "Hello,world!";
channel.basicPublish(exchange, "", null, message.getBytes());
});
}

@Test
void consume() {
AMQPUtil.test(channel -> {
//两个消费者,一个消费队列1的消息,一个消费队列2的消息
//当向交换机发布消息时,每次发布消息时两个消费者都能消费到
DefaultConsumer consumer1 = AMQPUtil.newConsumer("消费者1",channel);
channel.basicConsume(queue1, true, consumer1);
DefaultConsumer consumer2 = AMQPUtil.newConsumer("消费者2",channel);
channel.basicConsume(queue2, true, consumer2);
System.in.read();
});
}

}

路由模式【DIRECT交换机】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.coderxi.util.AMQPUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.DefaultConsumer;
import org.junit.jupiter.api.Test;

import java.util.Scanner;

public class Test4Routing {

String
exchange = "routing_exchange",
queue1 = "routing_queue_1",
queue2 = "routing_queue_2",
routingKey = "routing_key";

@Test
void createQueue() {
AMQPUtil.test(channel -> {
//创建一个交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);
//创建两个队列
channel.queueDeclare(queue1, true, false, false, null);
channel.queueDeclare(queue2, true, false, false, null);
//将两个队列绑定到这个交换机(指定routingKey)
channel.queueBind(queue1, exchange, routingKey);
channel.queueBind(queue2, exchange, routingKey);
});
}

@Test
void publish() {
AMQPUtil.test(channel -> {
//向交换机内 所有routingKey为"routing_key"的队列 发布消息
String message = "Hello,world!";
channel.basicPublish(exchange, routingKey, null, message.getBytes());
});
}

@Test
void consume() {
AMQPUtil.test(channel -> {
//两个消费者,一个消费队列1的消息,一个消费队列2的消息
//由于队列1和队列2都绑定routingKey为"routing_key"
//当向交换机内 所有routingKey为"routing_key"的队列 发布消息时两个消费者都能消费到
DefaultConsumer consumer1 = AMQPUtil.newConsumer("消费者1", channel);
channel.basicConsume(queue1, true, consumer1);
DefaultConsumer consumer2 = AMQPUtil.newConsumer("消费者2", channel);
channel.basicConsume(queue2, true, consumer2);
System.in.read();
});
}

}

主题模式【TOPIC交换机】
direct是把固定的routingKey跟队列绑定,topic是把模糊的routingKey跟队列绑定

  • * (星号) 可以代替1个单词.
  • # (井号) 可以代替0个或多个单词.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    import com.coderxi.util.AMQPUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.DefaultConsumer;
    import org.junit.jupiter.api.Test;

    import java.util.Scanner;

    public class Test5Topics {

    String
    exchange = "topic_exchange",
    queue1 = "topic_queue_1",
    queue2 = "topic_queue_2",
    //*代表1个单词
    routingKey1 = "topic_key.*.test",
    //#代表0个或多个单词
    routingKey2 = "topic_key.#",
    publishRoutingKey1 = "topic_key.path.test",
    publishRoutingKey2 = "topic_key.path.path";

    @Test
    void createQueue() {
    AMQPUtil.test(channel -> {
    //创建一个交换机
    channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
    //创建两个队列
    channel.queueDeclare(queue1, true, false, false, null);
    channel.queueDeclare(queue2, true, false, false, null);
    //将两个队列绑定到这个交换机(指定routingKey)
    channel.queueBind(queue1, exchange, routingKey1);
    channel.queueBind(queue2, exchange, routingKey2);
    });
    }

    @Test
    void publish() {
    AMQPUtil.test(channel -> {
    //向交换机内 所有routingKey可以匹配"topic_key.path.test"的队列 发布消息
    String message = "Hello,world!";
    channel.basicPublish(exchange, publishRoutingKey1, null, message.getBytes());
    });
    }

    @Test
    void consume() {
    AMQPUtil.test(channel -> {
    //使用publishRoutingKey1时,1和2都会消费,2时只有2会消费
    DefaultConsumer consumer1 = AMQPUtil.newConsumer("消费者1", channel);
    channel.basicConsume(queue1, true, consumer1);
    DefaultConsumer consumer2 = AMQPUtil.newConsumer("消费者2", channel);
    channel.basicConsume(queue2, true, consumer2);
    System.in.read();
    });
    }

    }

客户端远程调用服务端【HEADER交换机】
header模式与routing不同的地方在于,header模式取消routingKey,使用header中的key/value(键值对)匹配队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import com.coderxi.util.AMQPUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.DefaultConsumer;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Scanner;

public class Test6Headers {

String
exchange = "headers_exchange",
queue1 = "headers_queue_1",
queue2 = "headers_queue_2";

@Test
void createQueue() {
AMQPUtil.test(channel -> {
//创建一个交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.HEADERS);
//创建两个队列
channel.queueDeclare(queue1, true, false, false, null);
channel.queueDeclare(queue2, true, false, false, null);
//将两个队列绑定到这个交换机(在routingKey的位置后面指定arguments)
channel.queueBind(queue1, exchange, "",new HashMap<String, Object>() {{
put("type", "headers_type1");
}});
channel.queueBind(queue2, exchange, "",new HashMap<String, Object>() {{
put("type", "headers_type2");
}});
});
}

@Test
void publish() {
AMQPUtil.test(channel -> {
//向交换机内 所有arguments中含有{type:headers_type1}的队列发送消息
String message = "Hello,world!";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(new HashMap<String, Object>() {{
put("type", "headers_type1");
}}).build();
channel.basicPublish(exchange, "", properties, message.getBytes());
});
}

@Test
void consume() {
AMQPUtil.test(channel -> {
//两个消费者,一个消费队列1的消息,一个消费队列2的消息
//当向交换机内发布 headers为{type:headers_type1} 的消息时
//只有消费者1消费
DefaultConsumer consumer1 = AMQPUtil.newConsumer("消费者1", channel);
channel.basicConsume(queue1, true, consumer1);
DefaultConsumer consumer2 = AMQPUtil.newConsumer("消费者2", channel);
channel.basicConsume(queue2, true, consumer2);
System.in.read();
});
}

}

【DIRECT交换机】
声明一个临时、私用、匿名的队列,当服务器计算得到结果的时候会把结果发送到之前声明的匿名队列中。

1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

发布者确认模式(参考二月_春风的笔记绘制的)

从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class AMQPUtil {

public static Connection getConnection() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

public static DefaultConsumer newConsumer(String consumerName, Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-----------------------------------------");
System.out.println("| consumerName: " + consumerName);
System.out.println("| consumerTag : " + consumerTag);
//System.out.println("| envelope : " + envelope);
//System.out.println("| properties : " + properties);
System.out.println("| body : " + new String(body, StandardCharsets.UTF_8));
System.out.println("-----------------------------------------");
}
};
}

public static void test(AMQPTestFunc func) {
try (Connection connection = AMQPUtil.getConnection();
Channel channel = connection.createChannel()) {
func.invoke(channel);
} catch (Exception e) {
e.printStackTrace();
}
}

public interface AMQPTestFunc {
void invoke(Channel channel) throws IOException;
}
}

SpringBoot

  • 相关配置(可不配,下面都是默认值)
    application.yml
    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /

实现Routing模式

com.coderxi.config.AMQPConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AMQPConfig {

public static final String
EXCHANGE = "springboot_exchange",
QUEUE_1 = "springboot_queue_1",
QUEUE_2 = "springboot_queue_2",
ROUTING_KEY = "springboot_routingKey";

//定义交换机
@Bean(EXCHANGE)
Exchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
}

//定义两个队列
@Bean(QUEUE_1)
Queue queue1() { return new Queue(QUEUE_1); }

@Bean(QUEUE_2)
Queue queue2() { return new Queue(QUEUE_2); }

//绑定交换机和两个队列
@Bean
Binding binding1(@Qualifier(EXCHANGE) Exchange exchange, @Qualifier(QUEUE_1) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
}
@Bean
Binding binding2(@Qualifier(EXCHANGE) Exchange exchange, @Qualifier(QUEUE_2) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
}

}
com.coderxi.test.Sender
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import com.coderxi.config.AMQPConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class Sender {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
void send(){
rabbitTemplate.convertAndSend(AMQPConfig.EXCHANGE,AMQPConfig.ROUTING_KEY,"Hello,world!");
}

}
com.coderxi.test.Receiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import com.coderxi.config.AMQPConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

@RabbitListener(queues = AMQPConfig.QUEUE_1)
void receive1(String message) {
System.out.println("queue1: " + message);
}

@RabbitListener(queues = AMQPConfig.QUEUE_2)
void receive2(String message) {
System.out.println("queue2: " + message);
}

}

运行 Sender.send() ,可以看到两个消费者正常消费

延时消息队列

延时队列即放置在该队列里面的消息不需要立即消费的。而是等待一段时间之后取出消费。

  • 电商交易场景交易中超时未支付的订单需要被关闭,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付,如支付未完成,则关闭订单。

TTL DLX实现

TTL 消息存活时间(Time to Live):队列中的消息的存活时间,(或 单条消息的存活时间)
DLX 死信交换机(Dead Letter Exchanges):当消息在一个队列中变成死信(dead letter)之后, 它能被重新publish到另一个交换机, 这个交换机就是DLX

  • 实现思路
    首先我们有一个交换机和一个和它绑定的消息队列,曾经是直接向这个消息队列发送消息
    graph LR P((发送者)) --> 交换机 --> Q(消息队列) --> C((接收者))
    现在我们想等待一段时间再向这个消息队列发送消息,就需要引入一个新的延时消息队列。
    首先将一条设定了存活时间的消息发送给延时消息队列,等这个消息死亡变成死信,发送到原来的交换机以及消息队列即可
    graph LR P((发送者)) --> Q1("延时消息队列") --过了生存时间变成死信--> DLX["交换机<br>(此时作为死信交换机)"] --> Q2(消息队列) --> C((接收者))

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AMQPConfig {

public static final String
EXCHANGE_DLX = "exchange_dlx",
QUEUE_DELAY_MESSAGE = "queue_delay_message",
QUEUE_MESSAGE = "queue_message";

@Bean
Exchange dlxExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_DLX).build();
}

@Bean
Queue delayMessageQueue() {
return
QueueBuilder.durable(QUEUE_DELAY_MESSAGE)
//队列中的消息的存活时间(单位:毫秒)
.withArgument("x-message-ttl",3000)
//队列中的消息死亡后发给的交换机
.withArgument("x-dead-letter-exchange", EXCHANGE_DLX)
//队列中的消息死亡后发给的routingKey/队列
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE)
.build();
}

@Bean
Queue messageQueue() {
return new Queue(QUEUE_MESSAGE, true);
}

@Bean
Binding binding(Exchange exchange, Queue messageQueue) {
return BindingBuilder.bind(messageQueue).to(exchange).with(QUEUE_MESSAGE).noargs();
}

}

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
public class Sender {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() throws IOException {
System.out.printf("[%s]发送消息\n" , DateUtil.now());
rabbitTemplate.convertAndSend(AMQPConfig.QUEUE_DELAY_MESSAGE,"Hello world!");
System.in.read();
}

}

接收者

1
2
3
4
5
6
7
8
9
@Component
public class Receiver {

@RabbitListener(queues = AMQPConfig.QUEUE_MESSAGE)
public void receive(Object message) {
System.out.printf("[%s]收到消息: %s\n" , DateUtil.now(), message);
}

}

配置类

1
2
//和第1种一样,不过QUEUE_DELAY_MESSAGE不再指定此项
.withArgument("x-message-ttl",3000)

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.coderxi.config.AMQPConfig;
import com.coderxi.util.DateUtil;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
public class Sender {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() throws IOException {
System.out.printf("[%s]发送消息\n" , DateUtil.now());
rabbitTemplate.convertAndSend(AMQPConfig.QUEUE_DELAY_MESSAGE,"Hello world!", message -> {
message.getMessageProperties().setExpiration("3000");
return message;
});
System.in.read();
}

}

接收者不变

[2021-08-25 10:02:50]发送消息
[2021-08-25 10:02:53]收到消息: (Body:’Hello world!’ MessageProperties [headers={x-first-death-exchange=, x-death=[{reason=expired, count=1, exchange=, time=Wed Aug 25 10:02:53 CST 2021, routing-keys=[queue_delay_message], queue=queue_delay_message}], x-first-death-reason=expired, x-first-death-queue=queue_delay_message}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange_dlx, receivedRoutingKey=queue_message, deliveryTag=1, consumerTag=amq.ctag-IKyIU0Eq5QUf6HwXh7gC9Q, consumerQueue=queue_message])

com.coderxi.util.DateUtil
1
2
3
4
5
6
public final class DateUtil {
private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static String now(){
return dateFormat.format(System.currentTimeMillis());
}
}
  • @RabbitListener 注解也可以直接使用到类上,配合 @RabbitHandler 使用
    1
    2
    3
    4
    5
    6
    7
    8
    @Component
    @RabbitListener(queues = AMQPConfig.QUEUE_MESSAGE)
    public class Receiver {
    @RabbitHandler
    public void receive(Object message) {
    System.out.printf("[%s]收到消息: %s\n" , DateUtil.now(), message);
    }
    }
  • 为什么实现Routing模式时使用了 @Bean(QUEUE_1) (@Qualifier(QUEUE_1) Queue queue) 而这次没有使用:
    因为前者只是为了让queue对象在spring中注册为他的名字,后来发现其实不是很必要(至少目前不需要将queue注入到别处),创建binding的参数列表的变量名和创建queue的方法名(默认实例名)保持一致即可