i18nsite opened a new pull request, #3278:
URL: https://github.com/apache/kvrocks/pull/3278

   # feat(stream): Add support for XREADGROUP CLAIM option
   
   [English](#en) / [中文](#cn)
   
   ---
   
   <a id="en"></a>
   ## English
   
   ### Overview
   
   This PR implements the `CLAIM` option for the `XREADGROUP` command, enabling 
consumers to claim idle pending messages from other consumers in a consumer 
group. This feature is essential for handling failed or slow consumers in 
distributed stream processing scenarios.
   
   ### Motivation
   
   In distributed stream processing with consumer groups, consumers may fail or 
become slow, causing messages to remain in their Pending Entries List (PEL) 
indefinitely. The `CLAIM` option allows other consumers to take over these idle 
messages, improving fault tolerance and ensuring timely message processing.
   
   ### Changes
   
   #### Core Implementation
   
   1. **Command Parsing** (`src/commands/cmd_stream.cc`)
      - Added `CLAIM min-idle-time` parameter parsing with validation
      - Implemented smart response format selection (2-element vs 4-element)
      - Updated `AddStreamEntriesToResponse` for conditional formatting
   
   2. **Stream Processing** (`src/types/redis_stream.cc`)
      - Implemented comprehensive claiming logic in `Stream::RangeWithPending`
      - Added idle time calculation and message ownership transfer
      - Implemented ordering guarantees (idle entries first, ordered by idle 
time)
      - Optimized consumer metadata management
      - Fixed PEL iteration to respect `exclude_start` option
   
   3. **Data Structures** (`src/types/redis_stream_base.h`, 
`src/types/redis_stream.h`)
      - Extended `StreamEntry` with `idle_ms` and `delivery_count` fields
      - Created `StreamReadGroupReadOptions` struct for CLAIM parameters
      - Updated `RangeWithPending` signature
   
   #### Key Features
   
   - ✅ **Extended Reply Format**: Claimed entries return `[id, fields, idle_ms, 
delivery_count]`
   - ✅ **Ordering Guarantees**: Idle entries first (longest idle first), then 
new entries
   - ✅ **NOACK Interaction**: Claimed entries still added to PEL (per Redis 
spec)
   - ✅ **Min-Idle-Time Filtering**: Only claim messages exceeding threshold
   - ✅ **Delivery Count Tracking**: Accurate increment and reporting
   
   #### Test Coverage
   
   Comprehensive test suite in 
`tests/gocase/unit/type/stream/xreadgroup_test.go`:
   
   1. **Basic CLAIM**: Message claiming and ownership transfer
   2. **Ordering Guarantees**: Validates idle-first ordering by idle time
   3. **NOACK Interaction**: Verifies claimed entries remain in PEL
   4. **Min-Idle-Time Filtering**: Tests threshold-based claiming
   5. **Delivery Count**: Validates accurate increment tracking
   
   All tests passing ✅
   
   ### Usage Example
   
   ```bash
   # Consumer1 reads messages
   XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >
   
   # Consumer2 claims messages idle for > 5000ms
   XREADGROUP GROUP mygroup consumer2 COUNT 10 CLAIM 5000 STREAMS mystream >
   
   # Response format:
   # [[mystream, [
   #   [entry_id, [field, value, ...], idle_ms, delivery_count],
   #   ...
   # ]]]
   ```
   
   ### Redis Specification Compliance
   
   This implementation follows the Redis protocol specification:
   - CLAIM option accepts min-idle-time in milliseconds
   - Returns entries in extended format with idle_ms and delivery_count
   - Correctly updates PEL and consumer metadata
   - Compatible with `>` (latest) ID and NOACK option
   - Maintains ordering guarantees
   
   ### Testing
   
   ```bash
   # Run XREADGROUP CLAIM tests
   ./test.sh
   
   # All tests passing:
   # --- PASS: TestXReadGroup (3.31s)
   #     --- PASS: XREADGROUP_with_CLAIM_option (0.10s)
   #     --- PASS: XREADGROUP_CLAIM_ordering_guarantees (0.21s)
   #     --- PASS: XREADGROUP_CLAIM_with_NOACK (0.10s)
   #     --- PASS: XREADGROUP_CLAIM_min_idle_time_filter (0.05s)
   #     --- PASS: XREADGROUP_CLAIM_delivery_count_increment (0.10s)
   ```
   
   ### Performance
   
   - **Time Complexity**: O(N) where N is the COUNT parameter
   - **Space Complexity**: O(N) for returned entries
   - **Database Operations**: Single batched write for all PEL updates (atomic)
   
   ---
   
   <a id="cn"></a>
   ## 中文
   
   ### 概述
   
   本PR为 `XREADGROUP` 命令实现了 `CLAIM` 
选项,使消费者能够从消费者组中的其他消费者那里认领空闲的待处理消息。此功能对于处理分布式流处理场景中失败或缓慢的消费者至关重要。
   
   ### 动机
   
   在使用消费者组进行分布式流处理时,消费者可能会失败或变慢,导致消息无限期地保留在其待处理条目列表(PEL)中。`CLAIM` 
选项允许其他消费者接管这些空闲消息,提高容错性并确保及时处理消息。
   
   ### 更改内容
   
   #### 核心实现
   
   1. **命令解析** (`src/commands/cmd_stream.cc`)
      - 添加了 `CLAIM min-idle-time` 参数解析和验证
      - 实现智能响应格式选择(2元素 vs 4元素)
      - 更新 `AddStreamEntriesToResponse` 以支持条件格式化
   
   2. **流处理** (`src/types/redis_stream.cc`)
      - 在 `Stream::RangeWithPending` 中实现了完整的认领逻辑
      - 添加了空闲时间计算和消息所有权转移
      - 实现了排序保证(空闲条目优先,按空闲时间排序)
      - 优化了消费者元数据管理
      - 修复了PEL迭代以遵守 `exclude_start` 选项
   
   3. **数据结构** (`src/types/redis_stream_base.h`, `src/types/redis_stream.h`)
      - 扩展 `StreamEntry` 增加 `idle_ms` 和 `delivery_count` 字段
      - 创建 `StreamReadGroupReadOptions` 结构体用于传递CLAIM参数
      - 更新 `RangeWithPending` 函数签名
   
   #### 关键特性
   
   - ✅ **扩展回复格式**: 认领的条目返回 `[id, fields, idle_ms, delivery_count]`
   - ✅ **排序保证**: 空闲条目优先(最长空闲时间优先),然后是新条目
   - ✅ **NOACK交互**: 认领的条目仍然添加到PEL(符合Redis规范)
   - ✅ **最小空闲时间过滤**: 仅认领超过阈值的消息
   - ✅ **投递计数跟踪**: 准确的增量和报告
   
   #### 测试覆盖
   
   `tests/gocase/unit/type/stream/xreadgroup_test.go` 中的完整测试套件:
   
   1. **基础CLAIM**: 消息认领和所有权转移
   2. **排序保证**: 验证按空闲时间的空闲优先排序
   3. **NOACK交互**: 验证认领的条目保留在PEL中
   4. **最小空闲时间过滤**: 测试基于阈值的认领
   5. **投递计数**: 验证准确的增量跟踪
   
   所有测试通过 ✅
   
   ### 使用示例
   
   ```bash
   # 消费者1读取消息
   XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >
   
   # 消费者2认领空闲时间 > 5000ms 的消息
   XREADGROUP GROUP mygroup consumer2 COUNT 10 CLAIM 5000 STREAMS mystream >
   
   # 响应格式:
   # [[mystream, [
   #   [entry_id, [field, value, ...], idle_ms, delivery_count],
   #   ...
   # ]]]
   ```
   
   ### Redis规范合规性
   
   此实现遵循Redis协议规范:
   - CLAIM选项接受以毫秒为单位的最小空闲时间
   - 以扩展格式返回包含idle_ms和delivery_count的条目
   - 正确更新PEL和消费者元数据
   - 与 `>` (latest) ID和NOACK选项兼容
   - 维护排序保证
   
   ### 测试
   
   ```bash
   # 运行XREADGROUP CLAIM测试
   ./test.sh
   
   # 所有测试通过:
   # --- PASS: TestXReadGroup (3.31s)
   #     --- PASS: XREADGROUP_with_CLAIM_option (0.10s)
   #     --- PASS: XREADGROUP_CLAIM_ordering_guarantees (0.21s)
   #     --- PASS: XREADGROUP_CLAIM_with_NOACK (0.10s)
   #     --- PASS: XREADGROUP_CLAIM_min_idle_time_filter (0.05s)
   #     --- PASS: XREADGROUP_CLAIM_delivery_count_increment (0.10s)
   ```
   
   ### 性能
   
   - **时间复杂度**: O(N),其中N是COUNT参数
   - **空间复杂度**: O(N),用于返回的条目
   - **数据库操作**: 所有PEL更新的单次批量写入(原子操作)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to