实现原理
参考了使用Redis实现MQ
Redis中的publish-subscribe
redis中已经实现了publish-subscribe,订阅者(Subscriber)可以订阅自己感兴趣的频道(Channel),发布者(Publisher)可以将消息发往指定的频道(Channel),正式通过这种方式,可以将消息的发送者和接收者解耦。另外,由于可以动态的Subscribe和Unsubscribe,也可以提高系统的灵活性和可扩展性。
- 打开redis客户端,使用SUBSCRIBE命令就可以订阅消息了,如:
SUBSCRIBE channel1 channel2
- 发布命令如下:
PUBLISH channel1 "Hello,world!"
- 这样在消息订阅的一方就可以接收到消息了,如下:
1) "message"2) "channel1"3) "Hello,world!"
在SpringBoot中使用
@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("channel1")); return container;}
@Bean //一定要将消息适配器注册到spring容器MessageListenerAdapter listenerAdapter(Receiver receiver/*自定义的类的对象*/) { return new MessageListenerAdapter(receiver, "receiveMessage"/*反射调用这个对象的这个方法*/);}
ok,也就是只要提供一个RedisMessageListenerContainer和一堆MessageListenerAdapter给spring即可 而这些完全可以通过给方法加注解+反射直接创建出来
理论存在,实践开始!
- 首先需要拿到所有带有
@RedisMessageListener
注解的方法 在苦苦搜寻了一番后,在stackoverflow上找到了org.reflections.Reflections这个类(maven依赖地址),使用下面的方式就ok了
Set<Method> allMethods = new Reflections("扫描包名", new MethodAnnotationsScanner()).getMethodsAnnotatedWith(RedisMessageListener.class);
- 那么应该如何直接获取当前项目的basePackage呢?
又苦苦搜寻了一番后,终于在springboot的issues中找到了
AutoConfigurationPackages.get()
看一下所需参数,BeanFactory
!ok,从applicationContext
中掏出来传给他 打印输出返回值[com.coderxi]
,完美!拿到第一个即可!
String baskPackage = AutoConfigurationPackages.get(applicationContext.getBeanFactory()).get(0);
- 拿到了方法,也就能够拿到方法所在的类,再从容器中实例,就满足了构造
MessageListenerAdapter
的参数 但是,这个MessageListenerAdapter必须注册到容器 - 那么应该如何使用Java代码手动注册对象到Spring容器呢?
第三次苦苦搜寻了一番后,又通过尝试,找到了最优雅最简洁的方式
使用
AnnotationConfigApplicationContext
的registerBean
注册进去,然后get回来就ok 实例的名字不能重复,我就直接采用了方法名+类名+"MessageListenerAdapter"
作为实例名 - 最后,按照上面的
RedisMessageListenerContainer
的配置方式 将所有MessageListenerAdapter
实例和对应的channel
(也就是topic
) 交给他即可
成果
import org.reflections.Reflections;import org.reflections.scanners.MethodAnnotationsScanner;import org.springframework.boot.autoconfigure.AutoConfigurationPackages;import org.springframework.context.annotation.AnnotationConfigApplicationContext;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.listener.PatternTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.lang.reflect.Method;import java.util.LinkedHashMap;import java.util.Map;import java.util.Set;
@Configurationpublic class RedisMessageListenerAdapterConfiguration {
@Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, Map<MessageListenerAdapter, String[]> redisMessageListenerAdapterAndTopics) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); for (Map.Entry<MessageListenerAdapter, String[]> entry : redisMessageListenerAdapterAndTopics.entrySet()) { for (String topic : entry.getValue()) { container.addMessageListener(entry.getKey(), new PatternTopic(topic)); } } return container; }
@Bean Map<MessageListenerAdapter, String[]> redisMessageListenerAdapterAndTopics(AnnotationConfigApplicationContext applicationContext) { Map<MessageListenerAdapter, String[]> result = new LinkedHashMap<>(); String baskPackage = AutoConfigurationPackages.get(applicationContext.getBeanFactory()).get(0); Set<Method> allMethods = new Reflections(baskPackage, new MethodAnnotationsScanner()).getMethodsAnnotatedWith(RedisMessageListener.class); for (Method method : allMethods) { RedisMessageListener annotation = method.getAnnotation(RedisMessageListener.class); String newMessageListenerAdapterName = method.getName() + method.getDeclaringClass().getSimpleName() + "MessageListenerAdapter"; applicationContext.registerBean(newMessageListenerAdapterName, MessageListenerAdapter.class, applicationContext.getBean(method.getDeclaringClass()), method.getName()); result.put(applicationContext.getBean(newMessageListenerAdapterName,MessageListenerAdapter.class),annotation.channels()); } return result; }
}
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.METHOD)public @interface RedisMessageListener { String[] channels();}
import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.redis.core.StringRedisTemplate;
@SpringBootTestpublic class Sender {
@Autowired StringRedisTemplate redisTemplate;
@Test void send(){ redisTemplate.convertAndSend("channel1","Hello,world!"); }
}
import com.coderxi.redismq.RedisMessageListener;import org.springframework.stereotype.Component;
@Componentpublic class Receiver {
@RedisMessageListener(channels = "channel1") public void receive1(String message) { System.out.println("receive1: " + message); }
@RedisMessageListener(channels = {"channel1","channel2"}) public void receive2(String message) { System.out.println("receive2: " + message); }
}
- 向
channel1
频道发送消息,我们可以看到两个都响应了
receive2: Hello,world!receive1: Hello,world!
- 向
channel2
频道发送消息,我们可以看到只有receive2响应了
receive2: Hello,world!
评论