目录
- 1、先给项目导入Redisson依赖
- 2、配置redis
- 3、创建 RedissonConfig 配置
- 4、封装 Redis 延迟队列工具类
- 5、创建延迟队列业务枚举
- 6、定义延迟队列执行器
- 7、创建枚举中定义的Bean,并实现延迟队列执行器
- 8、创建延迟队列消费线程,项目启动完成后开启
- 9、创建一个测试接口,模拟添加延迟队列
- 总结
1、先给项目导入redisson依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.15.4</version> </dependency>
2、配置redis
3、创建 RedissonConfig 配置
4、封装 Redis 延迟队列工具类
import lombok.extern.slf4j.Slf4j; import org.redisson.api.RblockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beahttp://www.devze.comns.factory.annotation.Autowired; import org.springframework.stereotype.Component; import Java.util.Map; import java.util.concurrent.TimeUnit; /** * @author bigboss */ @Slf4j @Component public class RedisDelayQueueUtil { @Autowired private RedissonClient redissonClient; /** * 添加延迟队列 * * @param value 队列值 * @param delay 延迟时间 * @param timeUnit 时间单位 * @param queueCode 队列键android * @param <T> */ public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode) { try { RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); //删除之前进来的id值 delayedQueue.remove(value); delayedQueue.offer(value, delay, timeUnit); log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒"); } catch (Exception e) { log.error("(添加延时队列失败) {}", e.getMessage()); throw new RuntimeException("(添加延时队列失败)"); } } /** * 获取延迟队列 * * @param queueCode * @param <T> * @return * @throws InterruptedException */ public <T> T getDelayQueue(String queueCode) throws InterruptedException { RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode); T value = (T) blockingDeque.take(); return value; } /** * 删除延迟队列 * * @param queueCode * @param <T> * @return */ public <T> T removeDelayQueue(T value, String queueCode) { RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.remove(value); return value; } }
5、创建延迟队列业务枚举
import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; /** * @author: Bigboss369 * @Created: 2022/8/11/011 */ @Getter @NoArgsConstructor @AllArgsConstructor public enum RedisDelayQueueEnum { pengyuyan("pengyuyan","心跳", "pengyuyan"); /** * 延迟队列 Redis Key */ private String code; /** * 中文描述 */ private String name; /** * 延迟队列具体业务实现的 Bean * 可通过 Spring 的上下文获取 */ private String beanId; }
6、定义延迟队列执行器
/** * @author: Bigboss369 * @Created: 2022/8/11/011 */ public interface RedisDelayQueueHandle<T> { void execute(T t); }
7、创建枚举中定义的Bean,并实现延迟队列执行器
import com.a.service.RedisDelayQueueHandle; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Map; @Component(value = "pengyuyan") @Slf4j public class PengYuYan implements RedisDelayQueueHandle<编程客栈Map> { @Override public void execute(Map map) { log.info("收到心跳延迟消息map:" + map); } }
8、创建延迟队列消费线程,项目启动完成后开启
import com.a.service.RedisDelayQueueHandle; import com.a.util.RedisDelayQueueUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.sp编程ringframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; /** * @author bigboss */ @Slf4j @Component public class RedisDelayQueueRunner implements CommandLineRunner { @Autowired private RedisDelayQueueUtil redisDelayQueueUtil; @Override public void run(String... args) { RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values(); for (RedisDelayQueueEnum queueEnum : queueEnums) { new Thread(() -> { try { while (true) { Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode()); log.debug("----value-->" + value); RedisDelayQueueHandle redisDelayQueueHandle = SpringUtil.getBean(queueEnum.getBeanId()); redisDelayQueueHandle.execute(value); } } catch (InterruptedException e) { log.error("(Redis延迟队列异常中断) {}", e.getMessage()); } }).start(); } log.info(php"(Redis延迟队列启动成功)"); } }
9、创建一个测试接口,模拟添加延迟队列
发送心跳到队列里
类成功接收到心跳包
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论