1 前言
Redis 基本上是互聯(lián)網(wǎng)公司必備的工具了,Redis的應(yīng)用場(chǎng)景實(shí)在太多了,但是有很多相似的功能如果每個(gè)項(xiàng)目都要實(shí)現(xiàn)一遍就顯得太麻煩了,所以為了方便,我打算開(kāi)發(fā)一個(gè)基于 Redis 的工具集,盡量做到開(kāi)箱即用。
2 目前實(shí)現(xiàn)功能
這個(gè)工具集并沒(méi)有開(kāi)發(fā)完成,實(shí)現(xiàn)了部分功能,如下圖

簡(jiǎn)單介紹下已經(jīng)實(shí)現(xiàn)的模塊:
common : 整個(gè)項(xiàng)目公共模塊,比如AOP工具等;
delay: Redis實(shí)現(xiàn)的延遲隊(duì)列;
lock: Redis實(shí)現(xiàn)的分布式鎖;
mq: Redis實(shí)現(xiàn)消息隊(duì)列;
query: Redis實(shí)現(xiàn)分頁(yè)模糊查詢(xún);
web: Redis實(shí)現(xiàn)web相關(guān)的功能;
duplicate :防止重復(fù)提交;
以上的這些模塊都是已經(jīng)實(shí)現(xiàn)的了,還有 社交、限流、冪等相關(guān)功能后面會(huì)陸續(xù)實(shí)現(xiàn)。
3 如何使用
1.引入 Maven 依賴(lài)
目前可以下載代碼上傳到自己的私服或者本地倉(cāng)庫(kù),后面會(huì)推到 Maven 中央倉(cāng)庫(kù)
cn.org.wangchangjiu redis-util-spring-boot-starter 1.0.0-SNAPSHOT
2.配置文件(application.yaml)開(kāi)啟各模塊功能開(kāi)關(guān)
redis: util: mq: enable:true delay: enable:true
3.實(shí)現(xiàn)消息發(fā)送者
MQ消息發(fā)送:

延遲消息發(fā)送:

4.實(shí)現(xiàn)消息監(jiān)聽(tīng)器
MQ消息監(jiān)聽(tīng)器:

延遲消息監(jiān)聽(tīng)器:

4 MQ和delay實(shí)現(xiàn)細(xì)節(jié)
MQ實(shí)現(xiàn)細(xì)節(jié)
容器啟動(dòng)時(shí),簡(jiǎn)單來(lái)說(shuō)就是通過(guò)springboot自動(dòng)裝配,創(chuàng)建一些Bean,如下圖:

值得注意的是,springboot3.X 自動(dòng)裝配方式有點(diǎn)變化,需要?jiǎng)?chuàng)建文件 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,文件內(nèi)容就直接寫(xiě) 自動(dòng)配置類(lèi)

RedisUtilAutoConfiguration 主自動(dòng)裝配類(lèi)會(huì) import 各個(gè)模塊的自動(dòng)裝配類(lèi):

我們以 RedisStreamAutoConfiguration 為例:

該裝配類(lèi)生效需要顯示打開(kāi),然后就是創(chuàng)建各種Bean。
最主要的Bean有:
RedisMessageConsumerManager:

該Bean實(shí)現(xiàn)了 BeanPostProcessor 接口,主要作用是,獲取被注解 RedisMessageListener 修飾的方法,把信息封裝在 RedisMessageConsumerContainer 對(duì)象里,方便后面反射調(diào)用。

StreamMessageListenerContainer:
這個(gè)Bean主要是做 redis MQ 的配置,比如配置:一次最多獲取多少條消息、沒(méi)有消息時(shí)阻塞時(shí)間、執(zhí)行任務(wù)的executor、錯(cuò)誤處理器、以及消費(fèi)組、是否自動(dòng)ACK等配置,具體代碼如下:
@Bean(initMethod="start",destroyMethod="stop")
@DependsOn("redisMessageConsumerManager")
@ConditionalOnMissingBean
publicStreamMessageListenerContainer>streamMessageListenerContainer(@AutowiredRedisMessageConsumerManagerredisMessageConsumerManager,
@AutowiredRedisConnectionFactoryredisConnectionFactory,
@AutowiredErrorHandlererrorHandler){
MyRedisStreamProperties.Optionsoptions=myRedisStreamProperties.getOptions();
StreamMessageListenerContainer.StreamMessageListenerContainerOptions>containerOptions=
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
//一次最多獲取多少條消息
.batchSize(options.getBatchSize())
//運(yùn)行Stream的polltask
.executor(getStreamMessageListenerExecutor())
//Stream中沒(méi)有消息時(shí),阻塞多長(zhǎng)時(shí)間,需要比`spring.redis.timeout`的時(shí)間小
.pollTimeout(options.getPollTimeout())
//獲取消息的過(guò)程或獲取到消息給具體的消息者處理的過(guò)程中,發(fā)生了異常的處理
.errorHandler(errorHandler)
.build();
StreamMessageListenerContainer>streamMessageListenerContainer=
StreamMessageListenerContainer.create(redisConnectionFactory,containerOptions);
//獲取被RedisMessageListener注解修飾的bean
MapconsumerContainerGroups=
redisMessageConsumerManager.getConsumerContainerGroups();
//循環(huán)遍歷,創(chuàng)建消費(fèi)組
consumerContainerGroups.forEach((groupQueue,redisMessageConsumerContainer)->{
String[]groupQueues=groupQueue.split("#");
//創(chuàng)建消費(fèi)組
createGroups(groupQueues);
RedisMessageListenerredisMessageListener=redisMessageConsumerContainer.getRedisMessageListener();
if(!redisMessageListener.useGroup()){
//獨(dú)立消費(fèi)不使用組
streamMessageListenerContainer.receive(StreamOffset.fromStart(groupQueues[1]),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}else{
//消費(fèi)組消費(fèi)
if(redisMessageListener.autoAck()){
//自動(dòng)ACK
streamMessageListenerContainer.receiveAutoAck(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()),
StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}else{
//手動(dòng)ACK
streamMessageListenerContainer.receive(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()),
StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}
}
});
returnstreamMessageListenerContainer;
}
/**
*創(chuàng)建消費(fèi)組
*@paramgroupQueues
*/
privatevoidcreateGroups(String[]groupQueues){
//判斷是否存在隊(duì)列Key
if(stringRedisTemplate.hasKey(groupQueues[1])){
//獲取消費(fèi)組沒(méi)有則創(chuàng)建
StreamInfo.XInfoGroupsgroups=stringRedisTemplate.opsForStream().groups(groupQueues[1]);
if(groups.isEmpty()){
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}else{
AtomicBooleanexists=newAtomicBoolean(false);
groups.forEach(xInfoGroup->{
if(xInfoGroup.groupName().equals(groupQueues[0])){
exists.set(true);
}
});
if(!exists.get()){
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}
}
}else{
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}
}
//todo后面這個(gè)線程池也可以交由用戶(hù)配置
privateExecutorgetStreamMessageListenerExecutor(){
AtomicIntegerindex=newAtomicInteger(1);
intprocessors=Runtime.getRuntime().availableProcessors();
ThreadPoolExecutorexecutor=newThreadPoolExecutor(processors,processors,0,TimeUnit.SECONDS,
newLinkedBlockingDeque<>(),r->{
Threadthread=newThread(r);
thread.setName("async-stream-consumer-"+index.getAndIncrement());
thread.setDaemon(true);
returnthread;
});
returnexecutor;
}
發(fā)送消息流程:

redis 延遲隊(duì)列的實(shí)現(xiàn)原理和這個(gè)差不多,主要是 redission延遲隊(duì)列 + 自定義注解 + 反射,代碼都差不多。
-
代碼
+關(guān)注
關(guān)注
30文章
4956瀏覽量
73490 -
隊(duì)列
+關(guān)注
關(guān)注
1文章
46瀏覽量
11204 -
Redis
+關(guān)注
關(guān)注
0文章
390瀏覽量
12131
原文標(biāo)題:為了方便開(kāi)發(fā),我打算實(shí)現(xiàn)一個(gè)Redis 工具集
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
Redis的LRU實(shí)現(xiàn)和應(yīng)用
Redis Stream應(yīng)用案例
centos7 redis的安裝
Redis Cluster的基本原理及實(shí)現(xiàn)細(xì)節(jié)
Windows環(huán)境下使用Redis緩存工具的圖文詳細(xì)方法
談?wù)?b class='flag-5'>Redis怎樣配置實(shí)現(xiàn)主從復(fù)制?
Redis實(shí)現(xiàn)限流的三種方式分享
Redis官方可視化工具功能強(qiáng)大
Redis工具集的實(shí)現(xiàn)和使用
評(píng)論