Redis+SpringBoot+自定义注解实现消息发布订阅


发表于 修改于 后端随手写写 908 字 5 分钟

实现原理

参考了使用Redis实现MQ

Redis中的publish-subscribe

redis中已经实现了publish-subscribe,订阅者(Subscriber)可以订阅自己感兴趣的频道(Channel),发布者(Publisher)可以将消息发往指定的频道(Channel),正式通过这种方式,可以将消息的发送者和接收者解耦。另外,由于可以动态的Subscribe和Unsubscribe,也可以提高系统的灵活性和可扩展性。

  • 打开redis客户端,使用SUBSCRIBE命令就可以订阅消息了,如:
SUBSCRIBE channel1 channel2
  • 发布命令如下:
PUBLISH channel1 "Hello,world!"
  • 这样在消息订阅的一方就可以接收到消息了,如下:
1) "message"
2) "channel1"
3) "Hello,world!"

在SpringBoot中使用

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("channel1"));
return container;
}
@Bean //一定要将消息适配器注册到spring容器
MessageListenerAdapter listenerAdapter(Receiver receiver/*自定义的类的对象*/) {
return new MessageListenerAdapter(receiver, "receiveMessage"/*反射调用这个对象的这个方法*/);
}

ok,也就是只要提供一个RedisMessageListenerContainer和一堆MessageListenerAdapter给spring即可 而这些完全可以通过给方法加注解+反射直接创建出来

理论存在,实践开始!

  • 首先需要拿到所有带有 @RedisMessageListener 注解的方法 在苦苦搜寻了一番后,在stackoverflow上找到了org.reflections.Reflections这个类(maven依赖地址),使用下面的方式就ok了
Set<Method> allMethods = new Reflections("扫描包名", new MethodAnnotationsScanner()).getMethodsAnnotatedWith(RedisMessageListener.class);
  • 那么应该如何直接获取当前项目的basePackage呢? 又苦苦搜寻了一番后,终于在springboot的issues中找到了AutoConfigurationPackages.get() 看一下所需参数,BeanFactory!ok,从applicationContext中掏出来传给他 打印输出返回值 [com.coderxi],完美!拿到第一个即可!
String baskPackage = AutoConfigurationPackages.get(applicationContext.getBeanFactory()).get(0);
  • 拿到了方法,也就能够拿到方法所在的类,再从容器中实例,就满足了构造MessageListenerAdapter的参数 但是,这个MessageListenerAdapter必须注册到容器
  • 那么应该如何使用Java代码手动注册对象到Spring容器呢? 第三次苦苦搜寻了一番后,又通过尝试,找到了最优雅最简洁的方式 使用 AnnotationConfigApplicationContextregisterBean 注册进去,然后get回来就ok 实例的名字不能重复,我就直接采用了 方法名+类名+"MessageListenerAdapter" 作为实例名
  • 最后,按照上面的 RedisMessageListenerContainer 的配置方式 将所有 MessageListenerAdapter 实例和对应的 channel(也就是topic) 交给他即可

成果

com.coderxi.redismq.RedisMessageListenerAdapterConfiguration
import org.reflections.Reflections;
import org.reflections.scanners.MethodAnnotationsScanner;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.lang.reflect.Method;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
@Configuration
public class RedisMessageListenerAdapterConfiguration {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, Map<MessageListenerAdapter, String[]> redisMessageListenerAdapterAndTopics) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
for (Map.Entry<MessageListenerAdapter, String[]> entry : redisMessageListenerAdapterAndTopics.entrySet()) {
for (String topic : entry.getValue()) {
container.addMessageListener(entry.getKey(), new PatternTopic(topic));
}
}
return container;
}
@Bean
Map<MessageListenerAdapter, String[]> redisMessageListenerAdapterAndTopics(AnnotationConfigApplicationContext applicationContext) {
Map<MessageListenerAdapter, String[]> result = new LinkedHashMap<>();
String baskPackage = AutoConfigurationPackages.get(applicationContext.getBeanFactory()).get(0);
Set<Method> allMethods = new Reflections(baskPackage, new MethodAnnotationsScanner()).getMethodsAnnotatedWith(RedisMessageListener.class);
for (Method method : allMethods) {
RedisMessageListener annotation = method.getAnnotation(RedisMessageListener.class);
String newMessageListenerAdapterName = method.getName() + method.getDeclaringClass().getSimpleName() + "MessageListenerAdapter";
applicationContext.registerBean(newMessageListenerAdapterName, MessageListenerAdapter.class, applicationContext.getBean(method.getDeclaringClass()), method.getName());
result.put(applicationContext.getBean(newMessageListenerAdapterName,MessageListenerAdapter.class),annotation.channels());
}
return result;
}
}
com.coderxi.redismq.RedisMessageListener
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisMessageListener {
String[] channels();
}
com.coderxi.test.Sender
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
@SpringBootTest
public class Sender {
@Autowired
StringRedisTemplate redisTemplate;
@Test
void send(){
redisTemplate.convertAndSend("channel1","Hello,world!");
}
}
com.coderxi.test.Receiver
import com.coderxi.redismq.RedisMessageListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@RedisMessageListener(channels = "channel1")
public void receive1(String message) {
System.out.println("receive1: " + message);
}
@RedisMessageListener(channels = {"channel1","channel2"})
public void receive2(String message) {
System.out.println("receive2: " + message);
}
}
  • channel1频道发送消息,我们可以看到两个都响应了
receive2: Hello,world!
receive1: Hello,world!
  • channel2频道发送消息,我们可以看到只有receive2响应了
receive2: Hello,world!

评论