开发者

一文带你搞懂Redis Stream的6种消息处理模式

开发者 https://www.devze.com 2025-05-06 09:01 出处:网络 作者: 风象南
目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2. 消费者组模式(Consumer Groups)基本概念核心命令实现示例使用场景优缺点3. 阻塞式消费模式(blocking Consumption)基本概念
目录
  • 1. 简单消费模式(Simple Consumption)
    • 基本概念
    • 核心命令
    • 实现示例
    • 使用场景
    • 优缺点
  • 2. 消费者组模式(Consumer Groups)
    • 基本概念
    • 核心命令
    • 实现示例
    • 使用场景
    • 优缺点
  • 3. 阻塞式消费模式(blocking Consumption)
    • 基本概念
    • 核心命令
    • 实现示例
    • 使用场景
    • 优缺点
  • 4. 扇出模式(Fan-out Pattern)
    • 基本概念
    • 核心命令
    • 实现示例
    • 使用示例
    • 使用场景
    • 优缺点
  • 5. 重试与恢复模式(Retry and Recovery)
    • 基本概念
    • 核心命令
    • 实现示例
    • 使用场景
    • 优缺点
  • 6. 流处理窗口模式(Streaming Window Processing)
    • 基本概念
    • 实现方式
    • 实现示例
    • 使用场景
    • 优缺点
  • 结论

    Redis 5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,弥补了之前发布/订阅模式的不足,如消息持久化、消费者组、消息确认等特性。

    Redis Stream结合了传统消息队列和时序数据库的特点,适用于日志收集、事件驱动应用、实时分析等多种场景。

    本文将介绍Redis Stream的6种消息处理模式。

    1. 简单消费模式(Simple Consumption)

    基本概念

    简单消费模式是Redis Stream最基础的使用方式,不使用消费者组,直接读取流中的消息。生产者将消息追加到流中,消费者通过指定起始ID来读取消息。

    核心命令

    # 发布消息
    XADD stream_name [ID] field value [field value ...]
    
    # 读取消息
    XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id

    实现示例

    Redis CLI

    # 添加消息到stream
    > XADD mystream * sensor_id 1234 temperature 19.8 humidity 56
    "1647257548956-0"
    
    # 从头开始读取所有消息
    > XREAD STREAMS mystream 0
    1) 1) "mystream"
       2) 1) 1) "1647257548956-0"
             2) 1) "sensor_id"
                2) "1234"
                3) "temperature"
                4) "19.8"
                5) "humidity"
                6) "56"
    
    # 从指定ID开始读取
    > XREAD STREAMS mystream 1647257548956-0
    (empty list or set)
    
    # 从最新的消息ID之后开始读取(阻塞等待新消息)
    > XREAD BLOCK 5000 STREAMS mystream $
    (nil)

    Java Spring Boot示例

    @Service
    public class SimpleStreamService {
        
        @Autowired
        private StringRedisTemplate redisTemplate;
        
        /**
         * 发布消息到Stream
         */
        public String publishEvent(String streamKey, Map<String, Object> eventData) {
            StringRecord record = StreamRecords.string(eventData).withStreamKey(streamKey);
            return redisTemplate.opsForStream().add(record).getValue();
        }
        
        /**
         * 从指定位置开始读取消息
         */
        public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) {
            StreamReadOptions readOptions = StreamReadOptions.empty().count(count);
            return redisTemplate.opsForStream().read(readOptions, StreamOffset.from(streamKey, ReadOffset.from(startId)));
        }
        
        /**
         * 阻塞式读取消息
         */
        public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) {
            StreamReadOptions readOptions = StreamReadOptions.empty().count(10).block(Duration.ofMillis(timeoutMillis));
            return redisTemplate.opsForStream().read(readOptions, StreamOffset.latest(streamKey));
        }
    }

    使用场景

    • 简单的事件日志记录
    • 单一消费者场景
    • 时间序列数据收集
    • 开发和调试阶段

    优缺点

    优点

    • 实现简单,无需创建和管理消费者组
    • 直接控制从哪个位置开始消费消息
    • 适合单个消费者场景

    缺点

    • 无法实现负载均衡
    • 无法追踪消息确认状态
    • 需要手动管理已读消息ID
    • 服务重启需自行记录上次读取位置

    2. 消费者组模式(Consumer Groups)

    基本概念

    消费者组允许多个消费者共同处理一个流的消息,实现负载均衡,并提供消息确认机制,确保消息至少被处理一次。每个消费者组维护自己的消费位置,不同消费者组之间互不干扰。

    核心命令

    # 创建消费者组
    XGROUP CREATE stream_name group_name [ID|$] [MKSTREAM]
    
    # 从消费者组读取消息
    XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID]
    
    # 确认消息处理完成
    XACK stream_name group_name message_id [message_id ...]

    实现示例

    Redis CLI

    # 创建消费者组
    > XGROUP CREATE mystream processing-group $ MKSTREAM
    OK
    
    # 消费者1读取消息
    > XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream >
    1) 1) "mystream"
       2) 1) 1) "1647257548956-0"
             2) 1) "sensor_id"
                2) "1234"
                3) "temperature"
                4) "19.8"
                5) "humidity"
                6) "56"
    
    # 确认消息已处理
    > XACK mystream processing-group 1647257548956-0
    (integer) 1
    
    # 消费者2读取消息(已无未处理消息)
    > XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream >
    1) 1) "mystream"
       2) (empty list or set)

    Java Spring Boot示例

    @Service
    public class ConsumerGroupService {
        
        @Autowired
        private StringRedisTemplate redisTemplate;
        
        /**
         * 创建消费者组
         */
        public void createGroup(String streamKey, String groupName) {
            try {
                redisTemplate.opsForStream().createGroup(streamKey, groupName);
            } catch (RedisSystemException e) {
                // 处理流不存在的情况
                if (e.getRootCause() instanceof RedisCommandExecutionException 
                        && e.getRootCause().getMessage().contains("NOGROUP")) {
                    redisTemplate.opsForStream().createGroup(ReadOffset.from("0"), streamKey, groupName);
                } else {
                    throw e;
                }
            }
        }
        
        /**
         * 从消费者组读取消息
         */
        public List<MapRecord<String, Object, Object>> readFromGroup(
                String streamKey, String groupName, String consumerName, int count) {
            
            StreamReadOptions options = StreamReadOptions.empty().count(count);
            return redisTemplate.opsForStream().read(
                    Consumer.from(groupName, consumerName),
                    options,
                    StreamOffset.create(streamKey, ReadOffset.lastConsumed())
            );
        }
        
        /**
         * 阻塞式从消费者组读取消息
         */
        public List<MapRecord<String, Object, Object>> readFromGroupBlocking(
                String streamKey, String groupName, String consumerName, int count, Duration timeout) {
            
            StreamReadOptions options = StreamReadOptions.empty().count(count).block(timeout);
            return redisTemplate.opsForStream().read(
                    Consumer.from(groupName, consumerName),
                    options,
                    StreamOffset.create(streamKey, ReadOffset.lastConsumed())
            );
        }
        
        /**
         * 确认消息已处理
         */
        public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) {
            return redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageIds);
        }
    }

    使用场景

    • 需要横向扩展消息处理能力的系统
    • 要求消息可靠处理的业务场景
    • 实现消息工作队列
    • 微服务间的事件传递

    优缺点

    优点

    • 多个消费者可以并行处理消息
    • 提供消息确认机制,保证消息不丢失
    • 支持消费者崩溃后恢复处理
    • 每个消费者组维护独立的消费位置

    缺点

    • 实现相对复杂
    • 需要妥善管理消费者组和消费者
    • 需要显式处理消息确认
    • 需要定期处理未确认的消息

    3. 阻塞式消费模式(Blocking Consumption)

    基本概念

    阻塞式消费允许消费者在没有新消息时保持连接,等待新消息到达。这种模式减少了轮询开销,提高了实时性,适合对消息处理时效性要求高的场景。

    核心命令

    # 阻塞式简单消费
    XREAD BLOCK milliseconds STREAMS stream_name ID
    
    # 阻塞式消费者组消费
    XREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >

    实现示例

    Redis CLI

    # 阻塞等待新消息(最多等待10秒)
    > XREAD BLOCK 10000 STREAMS mystream $
    (nil)  # 如果10秒内没有新消息
    
    # 使用消费者组的阻塞式消费
    > XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream >
    (nil)  # 如果10秒内没有新分配的消息

    Java Spring Boot示例

    @Service
    public class BlockingStreamConsumerService {
        
        @Autowired
        private StringRedisTemplate redisTemplate;
        
        /**
         * 阻塞式消息消费者任务
         */
        @Async
        public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) {
            StreamReadOptions options = StreamReadOptions.empty()
                    .count(1)
                    .block(timeout);
            
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // 阻塞读取消息
                    List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
                            .read(options, StreamOffset.from(streamKey, ReadOffset.from(lastId)));
                    
                    if (records != null && !records.isEmpty()) {
                        for (MapRecord<String, Object, Object> record : records) {
                            // 处理消息
                            processMessage(record);
                            
                            // 更新最后读取的ID
                            lastId = record.getId().getValue();
                        }
                    } else {
                        // 超时未读取到消息,可以执行一些其他逻辑
            www.devze.com        }
                } catch (Exception e) {
                    // 异常处理
                    log.error("Error reading from stream: {}", e.getMessage(), e);
                    try {
                        Thread.sleep(1000); // 出错后等待一段时间再重试
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
        
        /**
         * 阻塞式消费者组消费
         */
        @Async
        public void startGroupBlockingConsumer(
                String streamKey, String groupName, String consumerName, Duration timeout) {
            
            StreamReadOptions options = StreamReadOptions.empty()
                    .count(1)
                    .block(timeout);
            
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // 阻塞读取消息
                    List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
                            .read(Consumer.from(groupName, consumerName),
                                   options,
                                   StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
                    
                    if (records != null && !records.isEmpty()) {
                        for (MapRecord<String, Object, Object> record : records) {
                            try {
        android                        // 处理消息
                                processMessage(record);
                                
                                // 确认消息
                                redisTemplate.opsForStream()
                                        .acknowledge(streamKey, groupName, record.getId().getValue());
                            } catch (Exception e) {
                                // 处理失败,记录日志
                                log.error("Error processing message: {}", e.getMessage(), e);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("Error reading from stream group: {}", e.getMessage(), e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
        
        private void processMessage(MapRecord<String, Object, Object> record) {
            // 实际消息处理逻辑
            log.info("Processing message: {}", record);
            // ...处理消息的具体业务逻辑
        }
    }

    使用场景

    • 实时数据处理系统
    • 事件驱动的任务处理
    • 低延迟要求的应用
    • 即时通讯系统
    • 通知服务

    优缺点

    优点

    • 减少轮询带来的资源浪费
    • 实时性好,消息到达后立即处理
    • 降低Redis和客户端的负载
    • 节省CPU和网络资源

    缺点

    • 长连接可能占用Redis连接资源
    • 需要合理设置超时时间
    • 可能需要处理网络中断后的重连
    • 消费者需要具备并发处理能力

    4. 扇出模式(Fan-out Pattern)

    基本概念

    扇出模式允许多个独立的消费者组同时消费同一个流中的所有消息,类似于发布/订阅模式,但具有消息持久化和回溯能力。每个消费者组独立维护自己的消费位置。

    核心命令

    创建多个消费者组,它们都独立消费同一个流:

    XGROUP CREATE stream_name group_name_1 $ MKSTREAM
    XGROUP CREATE stream_name group_name_2 $ MKSTREAM
    XGROUP CREATE stream_name group_name_3 $ MKSTREAM

    实现示例

    Redis CLI

    # 创建多个消费者组
    > XGROUP CREATE notifications analytics-group $ MKSTREAM
    OK
    > XGROUP CREATE notifications email-group $ MKSTREAM
    OK
    > XGROUP CREATE notifications mobile-group $ MKSTREAM
    OK
    
    # 添加一条消息
    > XADD notifications * type user_signup user_id 1001 email "user@example.com"
    "1647345678912-0"
    
    # 从各个消费者组读取(每个组都能收到所有消息)
    > XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications >
    1) 1) "notifications"
       2) 1) 1) "1647345678912-0"
             2) 1) "type"
                2) "user_signup"
                3) "user_id"
                4) "1001"
                5) "email"
                6) "user@example.com"
    
    > XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications >
    1) 1) "notifications"
       2) 1) 1) "1647345678912-0"
             2) 1) "type"
                2) "user_signup"
                3) "user_id"
                4) "1001"
                5) "email"
                6) "user@example.com"
    
    > XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications >
    1) 1) "notifications"
       2) 1) 1) "1647345678912-0"
             2) 1) "type"
                2) "user_signup"
                3) "user_id"
                4) "1001"
                5) "email"
                6) "user@example.com"

    Java Spring Boot示例

    @Service
    p编程ublic class FanOutService {
        
        @Autowired
        private StringRedisTemplate redisTemplate;
        
        /**
         * 初始化扇出消费者组
         */
        public void initializeFanOutGroups(String streamKey, List<String> groupNames) {
            // 确保流存在
            try {
                StreamInfo.XInfoStream info = redisTemplate.opsForStream().info(streamKey);
            } catch (Exception e) {
                // 流不存在,发送一个初始消息
                Map<String, Object> initialMessage = new HashMap<>();
                initialMessage.put("init", "true");
                redisTemplate.opsForStream().add(streamKey, initialMessage);
            }
            
            // 创建所有消费者组
            for (String groupName : groupNames) {
                try {
                    redisTemplate.opsForStream().createGroup(streamKey, groupName);
                } catch (Exception e) {
                    // 忽略组已存在的错误
                    log.info("Group {} may already exist: {}", groupName, e.getMessage());
                }
            }
        }
        
        /**
         * 发布扇出消息
         */
        public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) {
            StringRecord record = StreamRecords.string(messageData).withStreamKey(streamKey);
            return redisTemplate.opsForStream().add(record).getValue();
        }
        
        /**
         * 为特定组启动消费者
         */
        @Async
        public void startGroupConsumer(
                String streamKey, String groupName, String consumerName, 
                Consumer<MapRecord<String, Object, Object>> messageHandler) {
            
            StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofSeconds(2));
            
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
                            Consumer.from(groupName, consumerName),
                            options,
                            StreamOffset.create(streamKey, ReadOffset.lastConsumed())
                    );
                    
                    if (messages != null && !messages.isEmpty()) {
                        for (MapRecord<String, Object, Object> message : messages) {
                            try {
                                // 处理消息
                                messageHandler.accept(message);
                                
                                // 确认消息
                                redisTemplate.opsForStream().acknowledge(
                                        streamKey, groupName, message.getId().getValue());
                            } catch (Exception e) {
                                log.error("Error processing message in group {}: {}", 
                                        groupName, e.getMessage(), e);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("Error reading from stream for group {}: {}", 
                            groupName, e.getMessage(), e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    使用示例

    @Service
    public class NotificationService {
        
        @Autowired
        private FanOutService fanOutService;
        
        @PostConstruct
        public void init() {
            // 初始化扇出组
            List<String> groups = Arrays.asList("email-group", "sms-group", "analytics-group");
            fanOutService.initializeFanOutGroups("user-events", groups);
            
            // 启动各个消费者组的处理器
            fanOutService.startGroupConsumer(
                    "user-events", "email-group", "email-consumer", this::processEmailNotification);
            
            fanOutService.startGroupConsumer(
                    "user-events", "sms-group", "sms-consumer", this::processSmsNotification);
            
            fanOutService.startGroupConsumer(
                    "user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent);
        }
        
        private void processEmailNotification(MapRecord<String, Object, Object> message) {
            Map<Object, Object> messageData = message.getValue();
            log.info("Processing email notification: {}", messageData);
            // 邮件发送逻辑
        }
        
        private void processSmsNotification(MapRecord<String, Object, Object> message) {
            Map<Object, Object> messageData = message.getValue();
            log.info("Processing SMS notification: {}", messageData);
            // 短信发送逻辑
        }
        
        private void processAnalyticsEvent(MapRecord<String, Object, Object> message) {
            Map<Object, Object> messageData = message.getValue();
            log.info("Processing analytics event: {}", messageData);
            // 分析事件处理逻辑
        }
        
        public void publishUserEvent(String eventType, Map<String, Object> eventData) {
            Map<String, Object> message = new HashMap<>(eventData);
            message.put("event_type", eventType);
            message.put("timestamp", System.currentTimeMillis());
            
            fanOutService.publishFanOutMessage("user-events", message);
        }
    }

    使用场景

    • 多个系统需要独立处理同一事件流
    • 实现事件广播机制
    • 系统集成:一个事件触发多个业务流程
    • 日志统一处理并分发到不同服务
    • 通知系统:一个事件需要通过多种方式通知用户

    优缺点

    优点

    • 实现一次发布多次消费
    • 各消费者组独立工作,互不影响
    • 新增消费者组可以从头开始消费所有历史消息
    • 可靠性高,消息持久化存储

    缺点

    • 随着流数据增长,可能占用较多存储空间
    • 需要合理设置流的最大长度或过期策略
    • 消费者组数量过多可能增加Redis负载
    • 需要单独管理每个消费者组的状态

    5. 重试与恢复模式(Retry and Recovery)

    基本概念

    这种模式关注处理失败消息的恢复和重试机制。Redis Stream消费者组会跟踪每个消息的处理状态,允许查看和管理未确认(PEL - Pending Entry List)的消息,实现可靠的消息处理。

    核心命令

    # 查看消费者组中未确认的消息
    XPENDING stream_name group_name [start_id end_id count] [consumer_name]
    
    # 查看消费者组中长时间未确认的消息详情
    XPENDING stream_name group_name start_id end_id count [consumer_name]
    
    # 认领处理超时的消息
    XCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]

    实现示例

    Redis CLI

    # 查看未确认的消息数量
    > XPENDING mystream processing-group
    1) (integer) 2         # 未确认消息数量
    2) "1647257548956-0"   # 最小ID
    3) "1647257549123-0"   # 最大ID
    4) 1) 1) "consumer-1"  # 各个消费者的未确认消息数
          2) (integer) 1
       2) 1) "consumer-2"
          2) (integer) 1
    
    # 查看特定消费者的未确认消息
    > XPENDING mystream processing-group - + 10 consumer-1
    1) 1) "1647257548956-0"   # 消息ID
       2) "consumer-1"         # 当前持有的消费者
       3) (integer) 120000     # 空闲时间(毫秒)
       4) (integer) 2          # 传递次数
    
    # 认领超过2分钟未处理的消息
    > XCLAIM myandroidstream processing-group consumer-2 120000 1647257548956-0
    1) 1) "1647257548956-0"
       2) 1) "sensor_id"
          2) "1234"
          3) "temperature"
          4) "19.8"
          5) "humidity"
          6) "56"

    Java Spring Boot示例

    @Service
    public class MessageRecoveryService {
        
        @Autowired
        private StringRedisTemplate redisTemplate;
        
        /**
         * 获取消费者组中的未确认消息
         */
        public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) {
            return redisTemplate.opsForStream().pending(streamKey, groupName);
        }
        
        /**
         * 获取指定消费者的详细未确认消息
         */
        public PendingMessages getPendingMessages(
                String streamKey, String groupName, String consumerName, 
                Range<String> idRange, long count) {
            
            return redisTemplate.opsForStream().pending(
                    streamKey, 
                    Consumer.from(groupName, consumerName), 
                    idRange, 
                    count);
        }
        
        /**
         * 认领长时间未处理的消息
         */
        public List<MapRecord<String, Object, Object>> claimMessages(
                String streamKey, String groupName, String newConsumerName, 
                Duration minIdleTime, String... messageIds) {
            
            return redisTemplate.opsForStream().claim(
                    streamKey, 
                    Consumer.from(groupName, newConsumerName), 
                    minIdleTime, 
                    messageIds);
        }
        
        /**
         * 定时检查和恢复未处理的消息
         */
        @Scheduled(fixedRate = 60000) // 每分钟执行一次
        public void recoverStaleMessages() {
            // 配置参数
            String streamKey = "mystream";
            String groupName = "processing-group";
            String recoveryConsumer = "recovery-consumer";
            Duration minIdleTime = Duration.ofMinutes(5); // 超过5分钟未处理的消息
            
            try {
                // 1. 获取所有未确认消息的摘要
                PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName);
                
                if (summary != null && summary.getTotalPendingMessages() > 0) {
                    // 2. 遍历每个消费者的未确认消息
                    for (Consumer consumer : summary.getPendingMessagesPerConsumer().keySet()) {
                        // 获取该消费者的详细未确认消息列表
                        PendingMessages pendingMessages = getPendingMessages(
                                streamKey, groupName, consumer.getName(), 
                                Range.unbounded(), 50); // 每次最多处理50条
                        
                        if (pendingMessages != null) {
                            // 3. 筛选出空闲时间超过阈值的消息
                            List<String> staleMessageIds = new ArrayList<>();
                            
                            for (PendingMessage message : pendingMessages) {
                                if (message.getElapsedTimeSinceLastDelivery().compareTo(minIdleTime) > 0) {
                                    staleMessageIds.add(message.getIdAsString());
                                }
                            }
                            
                            // 4. 认领这些消息
                            if (!staleMessageIds.isEmpty()) {
                                log.info("Claiming {} stale messages from consumer {}", 
                                        staleMessageIds.size(), consumer.getName());
                                
                                List<MapRecord<String, Object, Object>> claimedMessages = claimMessages(
                                        streamKey, groupName, recoveryConsumer, minIdleTime, 
                                        staleMessageIds.toArray(new String[0]));
                                
                                // 5. 处理这些被认领的消息
                                processClaimedMessages(streamKey, groupName, claimedMessages);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Error recovering stale messages: {}", e.getMessage(), e);
            }
        }
        
        /**
         * 处理被认领的消息
         */
        private void processClaimedMessages(
                String streamKey, String groupName, 
                List<MapRecord<String, Object, Object>> messages) {
            
            if (messages == null || messages.isEmpty()) {
                return;
            }
            
            for (MapRecord<String, Object, Object> message : messages) {
                try {
                    // 执行消息处理逻辑
                    processMessage(message);
                    
                    // 确认消息
                    redisTemplate.opsForStream().acknowledge(
                            streamKey, groupName, message.getId().getValue());
                    
                    log.info("Successfully processed recovered message: {}", message.getId());
                } catch (Exception e) {
                    log.error("Failed to process recovered message {}: {}", 
                            message.getId(), e.getMessage(), e);
                    // 根据业务需求决定是否将消息加入死信队列
                    moveToDeadLetterQueue(streamKey, message);
                }
            }
        }
        
        /**
         * 将消息移至死信队列
         */
        private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) {
            String deadLetterStream = sourceStream + ":dead-letter";
            Map<Object, Object> messageData = message.getValue();
            
            Map<String, Object> dlqMessage = new HashMap<>();
            messageData.forEach((k, v) -> dlqMessage.put(k.toString(), v));
            
            // 添加元数据
            dlqMessage.put("original_id", message.getId().getValue());
            dlqMessage.put("error_time", System.currentTimeMillis());
            
            redisTemplate.opsForStream().add(deadLetterStream, dlqMessage);
            
            // 可选:从原消费者组确认该消息
            // redisTemplate.opsForStream().acknowledge(sourceStream, groupName, message.getId().getValue());
        }
        
        private void processMessage(MapRecord<String, Object, Object> message) {
            // 实际的消息处理逻辑
            log.info("Processing recovered message: {}", message);
            // ...
        }
    }

    使用场景

    • 需要可靠消息处理的关键业务系统
    • 处理时间较长的任务
    • 需要错误重试机制的工作流
    • 监控和诊断消息处理过程
    • 实现死信队列处理特定失败场景

    优缺点

    优点

    • 提高系统容错性和可靠性
    • 自动恢复因消费者崩溃导致的未处理消息
    • 可以识别和处理长时间未确认的消息
    • 支持实现复杂的重试策略和死信处理

    缺点

    • 需要额外开发和维护恢复机制
    • 可能导致消息重复处理,需要确保业务逻辑幂等
    • 系统复杂度增加
    • 需要监控和管理PEL(未确认消息列表)的大小

    6. 流处理窗口模式(Streaming Window Processing)

    基本概念

    流处理窗口模式基于时间或消息计数划分数据流,在每个窗口内执行聚合或分析操作。这种模式适用于实时分析、趋势监测和时间序列处理。虽然Redis Stream本身不直接提供窗口操作,但可以结合Redis的其他特性实现。

    实现方式

    主要通过以下几种方式实现:

    1. 基于消息ID的时间范围(Redis消息ID包含毫秒时间戳)

    2. 结合Redis的排序集合(SortedSet)存储窗口数据

    3. 使用Redis的过期键实现滑动窗口

    实现示例

    Redis CLI

    窗口数据收集与查询:

    # 添加带时间戳的数据
    > XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000
    "1647257550123-0"
    > XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000
    "1647257560234-0"
    > XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000
    "1647257570345-0"
    
    # 查询特定时间范围的数据
    > XRANGE temperature 1647257550000-0 1647257570000-0
    1) 1) "1647257550123-0"
       2) 1) "sensor_id"
          2) "1"
          3) "value"
          4) "21.5"
          5) "timestamp"
          6) "1647257548000"
    2) 1) "1647257560234-0"
       2) 1) "sensor_id"
          2) "1"
          3) "value"
          4) "21.8"
          5) "timestamp"
          6) "1647257558000"

    Java Spring Boot示例

    @Service
    public class TimeWindowprocessingService {
        
        @Autowired
        private StringRedisTemplate redisTemplate;
        
        /**
         * 添加数据点到流,并存储到相应的时间窗口
         */
        public String addDataPoint(String streamKey, String sensorId, double value) {
            long timestamp = System.currentTimeMillis();
            
            // 1. 添加到原始数据流
            Map<String, Object> dataPoint = new HashMap<>();
            dataPoint.put("sensor_id", sensorId);
            dataPoint.put("value", String.valueOf(value));
            dataPoint.put("timestamp", String.valueOf(timestamp));
            
            StringRecord record = StreamRecords.string(dataPoint).withStreamKey(streamKey);
            RecordId recordId = redisTemplate.opsForStream().add(record);
            
            // 2. 计算所属的窗口(这里以5分钟为一个窗口)
            long Windowstart = timestamp - (timestamp % (5 * 60 * 1000));
            String windowKey = streamKey + ":window:" + windowStart;
            
            // 3. 将数据点添加到窗口的有序集合中,分数为时间戳
            String dataPointjson = new ObjectMapper().writeValueAsString(dataPoint);
            redisTemplate.opsForZSet().add(windowKey, dataPointJson, timestamp);
            
            // 4. 设置窗口键的过期时间(保留24小时)
            redisTemplate.expire(windowKey, Duration.ofHours(24));
            
            return recordId.getValue();
        }
        
        /**
         * 获取指定时间窗口内的数据点
         */
        public List<Map<String, Object>> getWindowData(
                String streamKey, long windowStartTime, long windowEndTime) {
            
            // 计算可能的窗口键(每5分钟一个窗口)
            List<String> windowKeys = new ArrayList<>();
            long current = windowStartTime - (windowStartTime % (5 * 60 * 1000));
            
            while (current <= windowEndTime) {
                windowKeys.add(streamKey + ":window:" + current);
                current += (5 * 60 * 1000);
            }
            
            // 从各个窗口获取数据点
            List<Map<String, Object>> results = new ArrayList<>();
            ObjectMapper mapper = new ObjectMapper();
            
            for (String windowKey : windowKeys) {
                Set<String> dataPoints = redisTemplate.opsForZSet().rangeByScore(
                        windowKey, windowStartTime, windowEndTime);
                
                if (dataPoints != null) {
                    for (String dataPointJson : dataPoints) {
                        try {
                            Map<String, Object> dataPoint = mapper.readValue(
                                    dataPointJson, new TypeReference<Map<String, Object>>() {});
                            results.add(dataPoint);
                        } catch (Exception e) {
                            log.error("Error parsing data point: {}", e.getMessage(), e);
                        }
                    }
                }
            }
            
            // 按时间戳排序
            results.sort(Comparator.comparing(dp -> Long.parseLong(dp.get("timestamp").toString())));
            
            return results;
        }
        
        /**
         * 计算窗口内数据的聚合统计
         */
        public Map<String, Object> getWindowStats(
                String streamKey, String sensorId, long windowStartTime, long windowEndTime) {
            
            List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime);
            
            // 过滤特定传感器的数据
            List<Double> values = windowData.stream()
                    .filter(dp -> sensorId.equals(dp.get("sensor_id").toString()))
                    .map(dp -> Double.parseDouble(dp.get("value").toString()))
                    .collect(Collectors.toList());
            
            Map<String, Object> stats = new HashMap<>();
            stats.put("count", values.size());
            
            if (!values.isEmpty()) {
                DoubleSummaryStatistics summaryStats = values.stream().collect(Collectors.summarizingDouble(v -> v));
                stats.put("min", summaryStats.getMin());
                stats.put("max", summaryStats.getMax());
                stats.put("avg", summaryStats.getAverage());
                stats.put("sum", summaryStats.getSum());
            }
            
            stats.put("start_time", windowStartTime);
            stats.put("end_time", windowEndTime);
            stats.put("sensor_id", sensorId);
            
            return stats;
        }
        
        /**
         * 实现滑动窗口处理
         */
        @Scheduled(fixedRate = 60000) // 每分钟执行一次
        public void processSlidingWindows() {
            String streamKey = "temperature";
            long now = System.currentTimeMillis();
            
            // 处理过去10分钟窗口的数据
            long windowEndTime = now;
            long windowStartTime = now - (10 * 60 * 1000);
            
            List<String> sensorIds = Arrays.asList("1", "2", "3"); // 示例传感器ID
            
            for (String sensorId : sensorIds) {
                try {
                    // 获取窗口统计
                    Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime);
                    
                    // 根据统计结果执行业务逻辑
                    if (stats.containsKey("avg")) {
                        double avgTemp = (double) stats.get("avg");
                        if (avgTemp > 25.0) {
                            // 触发高温警报
                            log.warn("High temperature alert for sensor {}: {} C", sensorId, avgTemp);
                            triggerAlert(sensorId, "HIGH_TEMP", avgTemp);
                        }
                    }
                    
                    // 存储聚合结果用于历史趋势分析
                    saveAggregatedResults(streamKey, sensorId, stats);
                    
                } catch (Exception e) {
                    log.error("Error processing sliding window for sensor {}: {}", 
                            sensorId, e.getMessage(), e);
                }
            }
        }
        
        /**
         * 触发警报
         */
        private void triggerAlert(String sensorId, String alertType, double value) {
            Map<String, Object> alertData = new HashMap<>();
            alertData.put("sensor_id", sensorId);
            alertData.put("alert_type", alertType);
            alertData.put("value", value);
            alertData.put("timestamp", System.currentTimeMillis());
            
            redisTemplate.opsForStream().add("alerts", alertData);
        }
        
        /**
         * 保存聚合结果
         */
        private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) {
            long windowTime = www.devze.com(long) stats.get("end_time");
            String aggregateKey = streamKey + ":aggregate:" + sensorId;
            
            // 使用时间作为分数存储聚合结果
            redisTemplate.opsForZSet().add(
                    aggregateKey, 
                    new ObjectMapper().writeValueAsString(stats),
                    windowTime);
            
            // 保留30天的聚合数据
            redisTemplate.expire(aggregateKey, Duration.ofDays(30));
        }
    }

    使用场景

    • 实时数据分析与统计
    • 趋势检测和预测
    • 异常值和阈值监控
    • 时间序列数据处理
    • IoT数据流处理和聚合
    • 用户行为分析

    优缺点

    优点

    • 支持基于时间的数据分析
    • 可以实现实时聚合和计算
    • 灵活的窗口定义(滑动窗口、滚动窗口)
    • 可扩展以支持复杂的分析场景

    缺点

    • 实现复杂度较高
    • 可能需要额外的数据结构和存储空间
    • 对于大数据量的窗口计算可能影响性能
    • 需要小心管理内存使用和数据过期策略

    结论

    Redis Stream提供了强大而灵活的消息处理功能,通过组合这些模式,可以构建出高性能、可靠且灵活的消息处理系统,满足从简单的任务队列到复杂的实时数据处理等各种应用需求。

    在选择和实现这些模式时,应充分考虑业务特性、性能需求、可靠性要求以及系统规模,结合Redis Stream的特性,打造最适合自己应用场景的消息处理解决方案。

    以上就是一文带你搞懂Redis Stream的6种消息处理模式的详细内容,更多关于Redis Stream消息处理模式的资料请关注编程客栈(www.devze.com)其它相关文章!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号