您好,登錄后才能下訂單哦!
本篇內容介紹了“Redis監聽過期的key實現流程是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
我們來個最簡單的集群架構,如下圖:
我們上面圖中看到是服務A和服務B就是同一個服務的不同實例。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.alian</groupId> <artifactId>expiration</artifactId> <version>0.0.1-SNAPSHOT</version> <name>expiration</name> <description>redis-key-expiration-listener</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.package.directory>target</project.package.directory> <java.version>1.8</java.version> <!--com.fasterxml.jackson 版本--> <jackson.version>2.9.10</jackson.version> <!--阿里巴巴fastjson 版本--> <fastjson.version>1.2.68</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <!--redis依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>${parent.version}</version> </dependency> <!--用于序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <!--java 8時間序列化--> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jsr310</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.14</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
# 端口
server.port=8090
# 上下文路徑
server.servlet.context-path=/expiration# Redis數據庫索引(默認為0)
spring.redis.database=0
# Redis服務器地址
spring.redis.host=192.168.0.193
#spring.redis.host=127.0.0.1
# Redis服務器連接端口
spring.redis.port=6379
# Redis服務器連接密碼(默認為空)
spring.redis.password=
# 連接池最大連接數(使用負值表示沒有限制)
spring.redis.jedis.pool.max-active=20
# 連接池中的最小空閑連接
spring.redis.jedis.pool.min-idle=10
# 連接池中的最大空閑連接
spring.redis.jedis.pool.max-idle=10
# 連接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.jedis.pool.max-wait=20000
# 讀時間(毫秒)
spring.redis.timeout=10000
# 連接超時時間(毫秒)
spring.redis.connect-timeout=10000
RedisConfig
package com.alian.expiration.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; @Configuration public class RedisConfig { /** * redis配置 * * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { // 實例化redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); //設置連接工廠 redisTemplate.setConnectionFactory(redisConnectionFactory); // key采用String的序列化 redisTemplate.setKeySerializer(keySerializer()); // value采用jackson序列化 redisTemplate.setValueSerializer(valueSerializer()); // Hash key采用String的序列化 redisTemplate.setHashKeySerializer(keySerializer()); // Hash value采用jackson序列化 redisTemplate.setHashValueSerializer(valueSerializer()); // 支持事務 // redisTemplate.setEnableTransactionSupport(true); //執行函數,初始化RedisTemplate redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * key類型采用String序列化 * * @return */ private RedisSerializer<String> keySerializer() { return new StringRedisSerializer(); } /** * value采用JSON序列化 * * @return */ private RedisSerializer<Object> valueSerializer() { //設置jackson序列化 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); //設置序列化對象 jackson2JsonRedisSerializer.setObjectMapper(getMapper()); return jackson2JsonRedisSerializer; } /** * 使用com.fasterxml.jackson.databind.ObjectMapper * 對數據進行處理包括java8里的時間 * * @return */ private ObjectMapper getMapper() { ObjectMapper mapper = new ObjectMapper(); //設置可見性 mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); //默認鍵入對象 mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); //設置Java 8 時間序列化 JavaTimeModule timeModule = new JavaTimeModule(); timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); //禁用把時間轉為時間戳 mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); mapper.registerModule(timeModule); return mapper; } @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
和我們之前整合redis差不多,只不過在最后增加了一個redis消息監聽監聽容器RedisMessageListenerContainer
RedisKeyExpirationListener
package com.alian.expiration.listener; import com.alian.expiration.service.RedisExpirationService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; @Slf4j @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { @Autowired private RedisExpirationService redisExpirationService; // 把我們上面一步配置的bean注入進去 public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } /** * 針對redis數據失效事件,進行數據處理 * * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern) { // 用戶做自己的業務處理即可,注意message.toString()可以獲取失效的key String expiredKey = message.toString(); log.info("onMessage --> redis 過期的key是:{}", expiredKey); try { // 對過期key進行處理 redisExpirationService.processingExpiredKey(expiredKey); log.info("過期key處理完成:{}", expiredKey); } catch (Exception e) { e.printStackTrace(); log.error("處理redis 過期的key異常:{}", expiredKey, e); } } }
實現的步驟如下:
繼承KeyExpirationEventMessageListener
把redis消息監聽監聽容器RedisMessageListenerContainer 注入到密鑰空間事件消息偵 聽器中
重寫onMessage方法
通過Message 的 toString() 方法就可以獲取到過期的key
對key中關鍵信息進行業務處理,比如 id
RedisExpirationService
package com.alian.expiration.service; import com.alian.expiration.util.SignUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; @Slf4j @Service public class RedisExpirationService { @Autowired private RedisTemplate<String, Object> redisTemplate; public void processingExpiredKey(String expiredKey) { // 如果是優惠券的key(一定要規范命名) if (expiredKey.startsWith("com.mall.coupon.id")) { // 臨時key,此key可以在業務處理完,然后延遲一定時間刪除,或者不處理 String tempKey = SignUtils.md5(expiredKey, "UTF-8"); // 臨時key不存在才設置值,key超時時間為10秒(此處相當于分布式鎖的應用) Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS); if (Boolean.TRUE.equals(exist)) { log.info("Business Handing..."); // 比如截取里面的id,然后關聯數據庫進行處理 } else { log.info("Other service is handing..."); } } else { log.info("Expired keys without processing"); } } }
基本流程如下:
判斷是否是需要處理的key,一般這種key通過命名規范加以處理
以當前key生成一個新的key作為分布式key
如果redis中不存在這個新的key,則為新的key設置一個值,達到分布式服務處理(核心)
設置成功的,進行業務處理;設置失敗了,說明其他服務正在處理這個key
根據 key 的關鍵信息(比如截取id),進行業務處理
SignUtils
package com.alian.expiration.util; import java.security.MessageDigest; public class SignUtils { public static final String md5(String s, String charset) { char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; try { byte[] btInput = s.getBytes(charset); MessageDigest mdInst = MessageDigest.getInstance("MD5"); mdInst.update(btInput); byte[] md = mdInst.digest(); int j = md.length; char[] str = new char[j * 2]; int k = 0; for (byte byte0 : md) { str[k++] = hexDigits[byte0 >>> 4 & 15]; str[k++] = hexDigits[byte0 & 15]; } return new String(str); } catch (Exception var11) { return ""; } } }
簡單模擬下發送一個優惠券數據到redis,然后設置超時時間
package com.alian.expiration; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @Slf4j @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class RedisKeyExpirationTest { @Autowired private RedisTemplate<String, Object> redisTemplate; @Test public void keyExpiration() { // 優惠券信息 String id = "2023021685264735"; Map<String, String> map = new HashMap<>(); map.put("id", id); map.put("amount", "1000"); map.put("type", "1001"); map.put("describe", "滿減紅包"); // 緩存到redis redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map); // 設置過期時間 redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS); } }
單實例就是服務只部署了一份,我們啟動一份,端口是8090,然后通過上面的測試類,發送一個消息,結果如下:
10:23:39 701 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
10:23:39 988 INFO [container-2]:Business Handing...
10:23:39 989 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
10:23:50 005 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
10:23:50 005 INFO [container-3]:Expired keys without processing
10:23:50 005 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12
多實例就是服務部署了多份,比如我們啟動兩份,端口分別為8090和8091,然后通過上面的測試類,發送一個消息,8090端口的服務結果如下(Business Handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Business Handing...
11:39:06 707 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12
8091端口的服務結果如下(Other service is handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Other service is handing...
11:39:06 707 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12
結果分析:
多實例的情況下,每個實例都會收到過期key通知
通過redis分布式鎖,實現只有一個實例會進行業務處理,防止重復
使用分布式鎖會有一個新的key過期,并且收到該key的通知,你可以業務執行完延遲一定時間(避免重復執行),再刪除,也可以不處理(因為本就不是要處理業務的key)
“Redis監聽過期的key實現流程是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。