实现原理

参考了使用Redis实现MQ

Redis中的publish-subscribe

redis中已经实现了publish-subscribe,订阅者(Subscriber)可以订阅自己感兴趣的频道(Channel),发布者(Publisher)可以将消息发往指定的频道(Channel),正式通过这种方式,可以将消息的发送者和接收者解耦。另外,由于可以动态的Subscribe和Unsubscribe,也可以提高系统的灵活性和可扩展性。

  • 打开redis客户端,使用SUBSCRIBE命令就可以订阅消息了,如:

    1
    SUBSCRIBE channel1 channel2
  • 发布命令如下:

    1
    PUBLISH channel1 "Hello,world!"
  • 这样在消息订阅的一方就可以接收到消息了,如下:

    1
    2
    3
    1) "message"
    2) "channel1"
    3) "Hello,world!"

在SpringBoot中使用

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("channel1"));
return container;
}

@Bean //一定要将消息适配器注册到spring容器
MessageListenerAdapter listenerAdapter(Receiver receiver/*自定义的类的对象*/) {
return new MessageListenerAdapter(receiver, "receiveMessage"/*反射调用这个对象的这个方法*/);
}

ok,也就是只要提供一个RedisMessageListenerContainer和一堆MessageListenerAdapter给spring即可
而这些完全可以通过给方法加注解+反射直接创建出来

理论存在,实践开始!

  • 首先需要拿到所有带有 @RedisMessageListener 注解的方法
    在苦苦搜寻了一番后,在stackoverflow上找到了org.reflections.Reflections这个类(maven依赖地址),使用下面的方式就ok了
    1
    Set<Method> allMethods = new Reflections("扫描包名", new MethodAnnotationsScanner()).getMethodsAnnotatedWith(RedisMessageListener.class);
  • 那么应该如何直接获取当前项目的basePackage呢?
    又苦苦搜寻了一番后,终于在springboot的issues中找到了AutoConfigurationPackages.get()
    看一下所需参数,BeanFactory!ok,从applicationContext中掏出来传给他
    打印输出返回值 [com.coderxi],完美!拿到第一个即可!
    1
    String baskPackage = AutoConfigurationPackages.get(applicationContext.getBeanFactory()).get(0);
  • 拿到了方法,也就能够拿到方法所在的类,再从容器中实例,就满足了构造MessageListenerAdapter的参数
    但是,这个MessageListenerAdapter必须注册到容器
  • 那么应该如何使用Java代码手动注册对象到Spring容器呢?
    第三次苦苦搜寻了一番后,又通过尝试,找到了最优雅最简洁的方式
    使用 AnnotationConfigApplicationContextregisterBean 注册进去,然后get回来就ok
    实例的名字不能重复,我就直接采用了 方法名+类名+"MessageListenerAdapter" 作为实例名
  • 最后,按照上面的 RedisMessageListenerContainer 的配置方式
    将所有 MessageListenerAdapter 实例和对应的 channel(也就是topic) 交给他即可

成果

com.coderxi.redismq.RedisMessageListenerAdapterConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.reflections.Reflections;
import org.reflections.scanners.MethodAnnotationsScanner;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import java.lang.reflect.Method;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

@Configuration
public class RedisMessageListenerAdapterConfiguration {

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, Map<MessageListenerAdapter, String[]> redisMessageListenerAdapterAndTopics) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
for (Map.Entry<MessageListenerAdapter, String[]> entry : redisMessageListenerAdapterAndTopics.entrySet()) {
for (String topic : entry.getValue()) {
container.addMessageListener(entry.getKey(), new PatternTopic(topic));
}
}
return container;
}

@Bean
Map<MessageListenerAdapter, String[]> redisMessageListenerAdapterAndTopics(AnnotationConfigApplicationContext applicationContext) {
Map<MessageListenerAdapter, String[]> result = new LinkedHashMap<>();
String baskPackage = AutoConfigurationPackages.get(applicationContext.getBeanFactory()).get(0);
Set<Method> allMethods = new Reflections(baskPackage, new MethodAnnotationsScanner()).getMethodsAnnotatedWith(RedisMessageListener.class);
for (Method method : allMethods) {
RedisMessageListener annotation = method.getAnnotation(RedisMessageListener.class);
String newMessageListenerAdapterName = method.getName() + method.getDeclaringClass().getSimpleName() + "MessageListenerAdapter";
applicationContext.registerBean(newMessageListenerAdapterName, MessageListenerAdapter.class, applicationContext.getBean(method.getDeclaringClass()), method.getName());
result.put(applicationContext.getBean(newMessageListenerAdapterName,MessageListenerAdapter.class),annotation.channels());
}
return result;
}

}
com.coderxi.redismq.RedisMessageListener
1
2
3
4
5
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisMessageListener {
String[] channels();
}
com.coderxi.test.Sender
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;

@SpringBootTest
public class Sender {

@Autowired
StringRedisTemplate redisTemplate;

@Test
void send(){
redisTemplate.convertAndSend("channel1","Hello,world!");
}

}
com.coderxi.test.Receiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.coderxi.redismq.RedisMessageListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

@RedisMessageListener(channels = "channel1")
public void receive1(String message) {
System.out.println("receive1: " + message);
}

@RedisMessageListener(channels = {"channel1","channel2"})
public void receive2(String message) {
System.out.println("receive2: " + message);
}

}
  • channel1频道发送消息,我们可以看到两个都响应了
    1
    2
    receive2: Hello,world!
    receive1: Hello,world!
  • channel2频道发送消息,我们可以看到只有receive2响应了
    1
    receive2: Hello,world!