yihua commented on code in PR #18224:
URL: https://github.com/apache/hudi/pull/18224#discussion_r3037295633


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.KinesisSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KinesisDeaggregator;
+import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
+import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+
+@Slf4j
+public abstract class KinesisSource<T> extends Source<T> {
+
+  protected static final String METRIC_NAME_KINESIS_MESSAGE_IN_COUNT = 
"kinesisMessageInCount";
+
+  protected final HoodieIngestionMetrics metrics;
+  protected final SchemaProvider schemaProvider;
+  protected KinesisOffsetGen offsetGen;
+  protected final boolean shouldAddMetaFields;
+  /** Checkpoint data (shardId -> sequenceNumber) collected during toBatch 
execution. Set by subclasses. */
+  protected Map<String, String> lastCheckpointData;
+
+  protected KinesisSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
+                          SourceType sourceType, HoodieIngestionMetrics 
metrics, StreamContext streamContext) {
+    super(props, sparkContext, sparkSession, sourceType, streamContext);
+    this.schemaProvider = streamContext.getSchemaProvider();
+    this.metrics = metrics;
+    this.shouldAddMetaFields = getBooleanWithAltKeys(props, 
KinesisSourceConfig.KINESIS_APPEND_OFFSETS);
+  }
+
+  @Override
+  protected final InputBatch<T> fetchNewData(Option<String> lastCkptStr, long 
sourceLimit) {
+    throw new UnsupportedOperationException("KinesisSource#fetchNewData should 
not be called");
+  }
+
+  @Override
+  protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> 
lastCheckpoint, long sourceLimit) {
+    // STEP 1: Collect all available shards for the stream: open/closed shards.
+    KinesisOffsetGen.KinesisShardRange[] allOpenClosedShardRanges = 
offsetGen.getNextShardRanges(lastCheckpoint, sourceLimit);
+    // STEP 2: Filter out shards with no unread records to avoid unnecessary 
GetRecords calls.
+    boolean useLatestStartingPositionStrategy =
+        offsetGen.getStartingPositionStrategy() == 
KinesisSourceConfig.KinesisStartingPositionStrategy.LATEST;
+    int numShardsBeforeFilter = allOpenClosedShardRanges.length;
+    KinesisOffsetGen.KinesisShardRange[] shardRangesWithUnreadRecords = 
Arrays.stream(allOpenClosedShardRanges)
+        .filter(range -> 
range.hasUnreadRecords(useLatestStartingPositionStrategy))
+        .toArray(KinesisOffsetGen.KinesisShardRange[]::new);
+    if (numShardsBeforeFilter > shardRangesWithUnreadRecords.length) {
+      log.info("Filtered {} shards with no unread records, {} shards remain",
+          numShardsBeforeFilter - shardRangesWithUnreadRecords.length, 
shardRangesWithUnreadRecords.length);
+    }
+    // When nothing to read, return empty batch and previous checkpoint if any.
+    if (shardRangesWithUnreadRecords.length == 0) {
+      
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
 0);
+      String checkpointStr = lastCheckpoint.isPresent() ? 
lastCheckpoint.get().getCheckpointKey() : "";
+      return new InputBatch<>(Option.empty(), checkpointStr);
+    }
+    // STEP 3: Otherwise, do the read.
+    T batch = toBatch(shardRangesWithUnreadRecords, sourceLimit);
+    // STEP 4: Generate checkpoint.
+    // Pass allOpenClosedShardRanges so filtered-out shards are preserved in 
the checkpoint; otherwise
+    // next run would re-read them from TRIM_HORIZON and cause duplicates
+    String checkpointStr = createCheckpointFromBatch(batch, 
shardRangesWithUnreadRecords, allOpenClosedShardRanges);
+    // STEP 5: Emit metrics.
+    long totalMsgs = getRecordCount(batch);
+    
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
 totalMsgs);
+    log.info("Read {} records from Kinesis stream {} with {} shards, 
checkpoint: {}",
+        totalMsgs, offsetGen.getStreamName(), 
shardRangesWithUnreadRecords.length, checkpointStr);
+
+    return new InputBatch<>(Option.of(batch), checkpointStr);
+  }
+
+  /** Upper bound on consecutive empty GetRecords responses before giving up 
on a shard. */
+  private static final int MAX_EMPTY_RESPONSES_FROM_GET_RECORDS = 100;
+
+  /**
+   * Lazy iterator over records from a single Kinesis shard.
+   *
+   * <p>Records are fetched one GetRecords page at a time; the next page is 
only requested once all
+   * records from the current page have been consumed. This avoids holding the 
full shard batch in
+   * executor memory simultaneously with the caller's output collection.
+   *
+   * <p>After {@link #hasNext()} returns {@code false} callers must read
+   * {@link #getLastSequenceNumber()} and {@link #isReachedEndOfShard()} to 
obtain checkpoint state.
+   *
+   * <p><b>lastSequenceNumber correctness invariant:</b> the sequence number 
is taken from the last
+   * <em>raw</em> Kinesis record (pre-deaggregation) of a page and is only 
committed once all
+   * deaggregated records from that page have been yielded. This guarantees 
the checkpoint never
+   * advances past records that have not yet been returned to the caller.
+   */
+  public static class ShardRecordIterator implements Iterator<Record> {
+    private final KinesisClient client;
+    private final String shardId;
+    private final int maxRecordsPerRequest;
+    private final long requestIntervalMs;
+    private final long maxTotalRecords;
+    private final boolean enableDeaggregation;
+    private final long retryInitialIntervalMs;
+    private final long retryMaxIntervalMs;
+    private final long throttleTimeoutMs;
+
+    /** Current position in the Kinesis shard; null means the shard is 
exhausted. */
+    private String shardIteratorStr;
+    /** Records from the most recently fetched page, ready to be yielded. */
+    private Iterator<Record> currentPage = Collections.emptyIterator();
+    /**
+     * Raw lastSeq of the page currently being consumed. Moved to {@link 
#lastSequenceNumber} only
+     * when the page iterator is fully exhausted, ensuring the checkpoint 
never skips records.
+     */
+    private String pendingPageLastSeq = null;
+    /** Checkpoint-safe lastSeq: reflects only fully-consumed pages. */
+    private String lastSequenceNumber = null;
+    private boolean reachedEndOfShard = false;
+    /** True once no further GetRecords calls should be made. */
+    private boolean fetchingDone = false;
+    private long totalConsumed = 0;
+    private int emptyPageCount = 0;
+
+    /**
+     * Dynamically tuned records-per-request limit.
+     * Halved on each ProvisionedThroughputExceededException and held there 
for the rest of the shard read.
+     */
+    private int currentMaxRecords;
+    /** Epoch ms of the last successful GetRecords call; used to enforce 
{@link #throttleTimeoutMs}. */
+    private long lastSuccessTimeMs;
+
+    public ShardRecordIterator(String initialShardIterator, KinesisClient 
client, String shardId,
+                               int maxRecordsPerRequest, long 
requestIntervalMs, long maxTotalRecords, boolean enableDeaggregation,
+                               long retryInitialIntervalMs, long 
retryMaxIntervalMs, long throttleTimeoutMs) {
+      this.shardIteratorStr = initialShardIterator;
+      this.client = client;
+      this.shardId = shardId;
+      this.maxRecordsPerRequest = maxRecordsPerRequest;
+      this.requestIntervalMs = requestIntervalMs;
+      this.maxTotalRecords = maxTotalRecords;
+      this.enableDeaggregation = enableDeaggregation;
+      this.retryInitialIntervalMs = retryInitialIntervalMs;
+      this.retryMaxIntervalMs = retryMaxIntervalMs;
+      this.throttleTimeoutMs = throttleTimeoutMs;
+      this.currentMaxRecords = maxRecordsPerRequest;
+      this.lastSuccessTimeMs = System.currentTimeMillis();
+    }
+
+    @Override
+    public boolean hasNext() {
+      while (true) {
+        if (currentPage.hasNext()) {
+          return true;
+        }
+        // Current page fully consumed: commit its lastSeq before moving on.
+        commitPendingPageLastSeq();
+        if (fetchingDone) {
+          return false;
+        }
+        fetchNextPage();
+        // Loop: if the page was empty, try fetching again (up to 
MAX_EMPTY_RESPONSES limit).
+      }
+    }
+
+    @Override
+    public Record next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more records for shard " + 
shardId);
+      }
+      totalConsumed++;
+      return currentPage.next();

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   **Deaggregation can blow past the configured batch cap.**
   
   `totalConsumed` is only consulted when deciding whether to fetch another 
page. Once Line 281 expands a raw Kinesis page into many user records, Lines 
191-210 will drain that whole page before the quota is checked again. That 
means `hoodie.streamer.source.kinesis.max.events` stops being a real upper 
bound as soon as KPL aggregation is enabled.
   
   
   
   Also applies to: 278-283
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java`
   around lines 191 - 210, The iterator drains an entire deaggregated 
currentPage
   in next() without rechecking the totalConsumed quota, allowing deaggregation 
to
   exceed hoodie.streamer.source.kinesis.max.events; modify hasNext() and 
next() so
   they both consult totalConsumed against the configured max (e.g., maxEvents)
   before yielding a record: when currentPage has elements, do not simply return
   true/unconditionally next() β€” instead compute remainingAllowed = maxEvents -
   totalConsumed and if remainingAllowed <= 0 treat as no more records; if
   currentPage contains more elements than remainingAllowed, only yield up to
   remainingAllowed (by limiting the per-page iterator or wrapping
   currentPage.next() to stop after remainingAllowed) and ensure
   commitPendingPageLastSeq()/fetchNextPage() behavior still runs when a page is
   fully consumed. Update logic in hasNext(), next(), and any loop that calls
   fetchNextPage()/commitPendingPageLastSeq() to enforce this quota check so
   deaggregation cannot blow past the configured cap (references: hasNext(),
   next(), commitPendingPageLastSeq(), fetchNextPage(), totalConsumed).
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:2d6277d7-75f4-4e28-b60a-1efa18b71895 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036410525)) 
(source:comment#3036410525)



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKinesisSource.java:
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import java.lang.reflect.Method;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.utilities.config.KinesisSourceConfig.KINESIS_REGION;
+import static 
org.apache.hudi.utilities.config.KinesisSourceConfig.KINESIS_STARTING_POSITION;
+import static 
org.apache.hudi.utilities.config.KinesisSourceConfig.KINESIS_STREAM_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for JsonKinesisSource.
+ */
+class TestJsonKinesisSource extends SparkClientFunctionalTestHarness {
+
+  private static final String STREAM_NAME = "test-stream";
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  private TestableJsonKinesisSource source;
+
+  @BeforeEach
+  void setup() {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(KINESIS_STREAM_NAME.key(), STREAM_NAME);
+    props.setProperty(KINESIS_REGION.key(), "us-east-1");
+    props.setProperty(KINESIS_STARTING_POSITION.key(), "TRIM_HORIZON");
+
+    source = new TestableJsonKinesisSource(
+        props, jsc(), spark(), null, mock(HoodieIngestionMetrics.class));
+  }
+
+  // --- recordToJsonStatic tests ---
+
+  private static String recordToJson(Record record, String shardId, boolean 
shouldAddOffsets)
+      throws Exception {
+    Method m = JsonKinesisSource.class.getDeclaredMethod(
+        "recordToJsonStatic", Record.class, String.class, boolean.class);
+    m.setAccessible(true);
+    return (String) m.invoke(null, record, shardId, shouldAddOffsets);
+  }
+
+  private static Record kinesisRecord(String data, String partitionKey, String 
sequenceNumber,
+      Instant approximateArrivalTimestamp) {
+    Record.Builder builder = Record.builder()
+        .data(SdkBytes.fromUtf8String(data))
+        .partitionKey(partitionKey)
+        .sequenceNumber(sequenceNumber);
+    if (approximateArrivalTimestamp != null) {
+      builder.approximateArrivalTimestamp(approximateArrivalTimestamp);
+    }
+    return builder.build();
+  }
+
+  @Test
+  void testRecordToJsonPassThroughWhenShouldAddOffsetsFalse() throws Exception 
{
+    String json = "{\"id\":1,\"name\":\"alice\"}";
+    Record record = kinesisRecord(json, "pk1", 
"49590382471490958861609854428592832524486083118",
+        Instant.ofEpochMilli(1700000000000L));
+
+    String result = recordToJson(record, "shardId-000000000000", false);
+
+    assertEquals(json, result);
+  }
+
+  @Test
+  void testRecordToJsonAddsOffsetFieldsWhenShouldAddOffsetsTrue() throws 
Exception {
+    String json = "{\"id\":1,\"name\":\"alice\"}";
+    Record record = kinesisRecord(json, "pk1", 
"49590382471490958861609854428592832524486083118",
+        Instant.ofEpochMilli(1700000000000L));
+
+    String result = recordToJson(record, "shardId-000000000001", true);
+
+    JsonNode node = MAPPER.readTree(result);
+    assertEquals("49590382471490958861609854428592832524486083118",
+        node.get("_hoodie_kinesis_source_sequence_number").asText());
+    assertEquals("shardId-000000000001", 
node.get("_hoodie_kinesis_source_shard_id").asText());
+    assertEquals("pk1", 
node.get("_hoodie_kinesis_source_partition_key").asText());
+    assertEquals(1700000000000L, 
node.get("_hoodie_kinesis_source_timestamp").asLong());
+    assertEquals(1, node.get("id").asInt());
+    assertEquals("alice", node.get("name").asText());
+  }
+
+  @Test
+  void testRecordToJsonEmptyStringReturnsNull() throws Exception {
+    Record record = kinesisRecord("", "pk1", "seq123", Instant.now());
+
+    String result = recordToJson(record, "shardId-0", false);
+
+    assertNull(result);
+  }
+
+  @Test
+  void testRecordToJsonWhitespaceOnlyReturnsNull() throws Exception {
+    Record record = kinesisRecord("   ", "pk1", "seq123", Instant.now());
+
+    String result = recordToJson(record, "shardId-0", false);
+
+    assertNull(result);
+  }
+
+  @Test
+  void testRecordToJsonNullTimestampNotAdded() throws Exception {
+    String json = "{\"id\":1}";
+    Record record = kinesisRecord(json, "pk1", "seq123", null);
+
+    String result = recordToJson(record, "shardId-0", true);
+
+    JsonNode node = MAPPER.readTree(result);
+    assertTrue(node.has("_hoodie_kinesis_source_sequence_number"));
+    assertTrue(node.has("_hoodie_kinesis_source_shard_id"));
+    assertTrue(node.has("_hoodie_kinesis_source_partition_key"));
+    assertEquals(false, node.has("_hoodie_kinesis_source_timestamp"));
+  }
+
+  @Test
+  void testRecordToJsonInvalidJsonWithShouldAddOffsetsReturnsOriginalString() 
throws Exception {
+    String invalidJson = "not valid json {";
+    Record record = kinesisRecord(invalidJson, "pk1", "seq123", Instant.now());
+
+    String result = recordToJson(record, "shardId-0", true);
+
+    assertEquals(invalidJson, result);
+  }

Review Comment:
   <a href="#"><img alt="P0" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p0.svg?v=7"; 
align="top"></a> **Test asserts wrong behavior β€” will fail at runtime**
   
   The test name says "ReturnsOriginalString", but the production code in 
`JsonKinesisSource.recordToJsonStatic` does the opposite: when 
`shouldAddMetaFields=true` and `OBJECT_MAPPER.readTree(dataStr)` throws a 
`JsonParseException` for invalid JSON, the `catch` block rethrows as 
`HoodieException("Failed to add metadata fields", e)`. That exception 
propagates through reflection as an `InvocationTargetException`, so 
`assertEquals(invalidJson, result)` is never reached.
   
   The test either needs to assert that an exception is thrown:
   ```java
   // Option A – assert exception
   assertThrows(HoodieException.class,
       () -> recordToJson(record, "shardId-0", true));
   ```
   or the production code must be changed to silently fall back (which would 
conflict with the explicit `throw` in the `catch` block and the comment "We can 
disable the flag for mitigation").
   
   β€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036407228)) 
(source:comment#3036407228)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.KinesisSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KinesisDeaggregator;
+import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
+import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+
+@Slf4j
+public abstract class KinesisSource<T> extends Source<T> {
+
+  protected static final String METRIC_NAME_KINESIS_MESSAGE_IN_COUNT = 
"kinesisMessageInCount";
+
+  protected final HoodieIngestionMetrics metrics;
+  protected final SchemaProvider schemaProvider;
+  protected KinesisOffsetGen offsetGen;
+  protected final boolean shouldAddMetaFields;
+  /** Checkpoint data (shardId -> sequenceNumber) collected during toBatch 
execution. Set by subclasses. */
+  protected Map<String, String> lastCheckpointData;
+
+  protected KinesisSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
+                          SourceType sourceType, HoodieIngestionMetrics 
metrics, StreamContext streamContext) {
+    super(props, sparkContext, sparkSession, sourceType, streamContext);
+    this.schemaProvider = streamContext.getSchemaProvider();
+    this.metrics = metrics;
+    this.shouldAddMetaFields = getBooleanWithAltKeys(props, 
KinesisSourceConfig.KINESIS_APPEND_OFFSETS);
+  }
+
+  @Override
+  protected final InputBatch<T> fetchNewData(Option<String> lastCkptStr, long 
sourceLimit) {
+    throw new UnsupportedOperationException("KinesisSource#fetchNewData should 
not be called");
+  }
+
+  @Override
+  protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> 
lastCheckpoint, long sourceLimit) {
+    // STEP 1: Collect all available shards for the stream: open/closed shards.
+    KinesisOffsetGen.KinesisShardRange[] allOpenClosedShardRanges = 
offsetGen.getNextShardRanges(lastCheckpoint, sourceLimit);
+    // STEP 2: Filter out shards with no unread records to avoid unnecessary 
GetRecords calls.
+    boolean useLatestStartingPositionStrategy =
+        offsetGen.getStartingPositionStrategy() == 
KinesisSourceConfig.KinesisStartingPositionStrategy.LATEST;
+    int numShardsBeforeFilter = allOpenClosedShardRanges.length;
+    KinesisOffsetGen.KinesisShardRange[] shardRangesWithUnreadRecords = 
Arrays.stream(allOpenClosedShardRanges)
+        .filter(range -> 
range.hasUnreadRecords(useLatestStartingPositionStrategy))
+        .toArray(KinesisOffsetGen.KinesisShardRange[]::new);
+    if (numShardsBeforeFilter > shardRangesWithUnreadRecords.length) {
+      log.info("Filtered {} shards with no unread records, {} shards remain",
+          numShardsBeforeFilter - shardRangesWithUnreadRecords.length, 
shardRangesWithUnreadRecords.length);
+    }
+    // When nothing to read, return empty batch and previous checkpoint if any.
+    if (shardRangesWithUnreadRecords.length == 0) {
+      
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
 0);
+      String checkpointStr = lastCheckpoint.isPresent() ? 
lastCheckpoint.get().getCheckpointKey() : "";
+      return new InputBatch<>(Option.empty(), checkpointStr);
+    }
+    // STEP 3: Otherwise, do the read.
+    T batch = toBatch(shardRangesWithUnreadRecords, sourceLimit);
+    // STEP 4: Generate checkpoint.
+    // Pass allOpenClosedShardRanges so filtered-out shards are preserved in 
the checkpoint; otherwise
+    // next run would re-read them from TRIM_HORIZON and cause duplicates
+    String checkpointStr = createCheckpointFromBatch(batch, 
shardRangesWithUnreadRecords, allOpenClosedShardRanges);
+    // STEP 5: Emit metrics.
+    long totalMsgs = getRecordCount(batch);
+    
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
 totalMsgs);
+    log.info("Read {} records from Kinesis stream {} with {} shards, 
checkpoint: {}",
+        totalMsgs, offsetGen.getStreamName(), 
shardRangesWithUnreadRecords.length, checkpointStr);
+
+    return new InputBatch<>(Option.of(batch), checkpointStr);
+  }
+
+  /** Upper bound on consecutive empty GetRecords responses before giving up 
on a shard. */
+  private static final int MAX_EMPTY_RESPONSES_FROM_GET_RECORDS = 100;
+
+  /**
+   * Lazy iterator over records from a single Kinesis shard.
+   *
+   * <p>Records are fetched one GetRecords page at a time; the next page is 
only requested once all
+   * records from the current page have been consumed. This avoids holding the 
full shard batch in
+   * executor memory simultaneously with the caller's output collection.
+   *
+   * <p>After {@link #hasNext()} returns {@code false} callers must read
+   * {@link #getLastSequenceNumber()} and {@link #isReachedEndOfShard()} to 
obtain checkpoint state.
+   *
+   * <p><b>lastSequenceNumber correctness invariant:</b> the sequence number 
is taken from the last
+   * <em>raw</em> Kinesis record (pre-deaggregation) of a page and is only 
committed once all
+   * deaggregated records from that page have been yielded. This guarantees 
the checkpoint never
+   * advances past records that have not yet been returned to the caller.
+   */
+  public static class ShardRecordIterator implements Iterator<Record> {
+    private final KinesisClient client;
+    private final String shardId;
+    private final int maxRecordsPerRequest;
+    private final long requestIntervalMs;
+    private final long maxTotalRecords;
+    private final boolean enableDeaggregation;
+    private final long retryInitialIntervalMs;
+    private final long retryMaxIntervalMs;
+    private final long throttleTimeoutMs;
+
+    /** Current position in the Kinesis shard; null means the shard is 
exhausted. */
+    private String shardIteratorStr;
+    /** Records from the most recently fetched page, ready to be yielded. */
+    private Iterator<Record> currentPage = Collections.emptyIterator();
+    /**
+     * Raw lastSeq of the page currently being consumed. Moved to {@link 
#lastSequenceNumber} only
+     * when the page iterator is fully exhausted, ensuring the checkpoint 
never skips records.
+     */
+    private String pendingPageLastSeq = null;
+    /** Checkpoint-safe lastSeq: reflects only fully-consumed pages. */
+    private String lastSequenceNumber = null;
+    private boolean reachedEndOfShard = false;
+    /** True once no further GetRecords calls should be made. */
+    private boolean fetchingDone = false;
+    private long totalConsumed = 0;
+    private int emptyPageCount = 0;
+
+    /**
+     * Dynamically tuned records-per-request limit.
+     * Halved on each ProvisionedThroughputExceededException and held there 
for the rest of the shard read.
+     */
+    private int currentMaxRecords;
+    /** Epoch ms of the last successful GetRecords call; used to enforce 
{@link #throttleTimeoutMs}. */
+    private long lastSuccessTimeMs;
+
+    public ShardRecordIterator(String initialShardIterator, KinesisClient 
client, String shardId,
+                               int maxRecordsPerRequest, long 
requestIntervalMs, long maxTotalRecords, boolean enableDeaggregation,
+                               long retryInitialIntervalMs, long 
retryMaxIntervalMs, long throttleTimeoutMs) {
+      this.shardIteratorStr = initialShardIterator;
+      this.client = client;
+      this.shardId = shardId;
+      this.maxRecordsPerRequest = maxRecordsPerRequest;
+      this.requestIntervalMs = requestIntervalMs;
+      this.maxTotalRecords = maxTotalRecords;
+      this.enableDeaggregation = enableDeaggregation;
+      this.retryInitialIntervalMs = retryInitialIntervalMs;
+      this.retryMaxIntervalMs = retryMaxIntervalMs;
+      this.throttleTimeoutMs = throttleTimeoutMs;
+      this.currentMaxRecords = maxRecordsPerRequest;
+      this.lastSuccessTimeMs = System.currentTimeMillis();
+    }
+
+    @Override
+    public boolean hasNext() {
+      while (true) {
+        if (currentPage.hasNext()) {
+          return true;
+        }
+        // Current page fully consumed: commit its lastSeq before moving on.
+        commitPendingPageLastSeq();
+        if (fetchingDone) {
+          return false;
+        }
+        fetchNextPage();
+        // Loop: if the page was empty, try fetching again (up to 
MAX_EMPTY_RESPONSES limit).
+      }
+    }
+
+    @Override
+    public Record next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more records for shard " + 
shardId);
+      }
+      totalConsumed++;
+      return currentPage.next();
+    }
+
+    public Option<String> getLastSequenceNumber() {
+      return Option.ofNullable(lastSequenceNumber);
+    }
+
+    public boolean isReachedEndOfShard() {
+      return reachedEndOfShard;
+    }
+
+    private void commitPendingPageLastSeq() {
+      if (pendingPageLastSeq != null) {
+        lastSequenceNumber = pendingPageLastSeq;
+        pendingPageLastSeq = null;
+      }
+    }
+
+    private void fetchNextPage() {
+      if (shardIteratorStr == null || totalConsumed >= maxTotalRecords) {
+        fetchingDone = true;
+        return;
+      }
+      GetRecordsResponse response;
+      int attempt = 0;
+      while (true) {
+        try {
+          response = client.getRecords(
+              GetRecordsRequest.builder()
+                  .shardIterator(shardIteratorStr)
+                  .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords - 
totalConsumed)))
+                  .build());
+          lastSuccessTimeMs = System.currentTimeMillis();
+          break;
+        } catch (ExpiredIteratorException e) {
+          log.warn("Shard iterator expired for {} during GetRecords, stopping 
read", shardId);
+          fetchingDone = true;
+          return;
+        } catch (ProvisionedThroughputExceededException e) {
+          long nowMs = System.currentTimeMillis();
+          if (nowMs - lastSuccessTimeMs > throttleTimeoutMs) {
+            throw new HoodieReadFromSourceException(
+                "Kinesis throughput exceeded for shard " + shardId + ": no 
successful fetch within "
+                    + throttleTimeoutMs + " ms. Last successful fetch, or 
first fetch was " + (nowMs - lastSuccessTimeMs) + " ms ago.", e);
+          }
+          // Halve the per-request limit to reduce pressure; floor at 1.
+          int prevLimit = currentMaxRecords;
+          currentMaxRecords = Math.max(1, currentMaxRecords / 2);
+          // Use attempt count only to compute exponential backoff delay, not 
as a stop condition.
+          long waitMs = Math.min(retryInitialIntervalMs * (1L << 
Math.min(attempt, 30)), retryMaxIntervalMs);
+          waitMs += ThreadLocalRandom.current().nextInt(500);
+          log.warn("Throughput exceeded for shard {}: halving records/request 
from {} to {}, retry after {} ms "
+              + "(no success for {} ms, will give up after {} ms)",
+              shardId, prevLimit, currentMaxRecords, waitMs, nowMs - 
lastSuccessTimeMs, throttleTimeoutMs);
+          try {
+            Thread.sleep(waitMs);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new HoodieReadFromSourceException("Interrupted while backing 
off for shard " + shardId, ie);
+          }
+          attempt++;

Review Comment:
   <a href="#"><img alt="P2" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"; 
align="top"></a> **Lexicographic sequence number comparison may be incorrect 
for different-width values**
   
   `lastSeq.compareTo(endSeq)` is used to detect fully-consumed closed shards. 
Kinesis sequence numbers are numeric decimal strings and are typically 56 
characters long, making lexicographic comparison equivalent to numeric ordering 
for same-width values. However, this is not guaranteed by the Kinesis API 
contract β€” LocalStack already returns `Long.MAX_VALUE` (19 digits) as `endSeq` 
for closed shards.
   
   When `lastSeq` is a 56-digit real sequence number and `endSeq` is 19 digits 
starting with `9`, the lexicographic comparison `"495..." < "922..."` (`'4' < 
'9'`) correctly identifies the shard as not fully consumed. That case happens 
to work, but only by accident of the particular sentinel value.
   
   Consider documenting explicitly that this comparison relies on the 
assumption that same-shard sequence numbers are always equal-width, or use 
`BigInteger` comparison for correctness:
   ```java
   // Using BigInteger for guaranteed correct numeric comparison
   try {
       return new java.math.BigInteger(lastSeq).compareTo(new 
java.math.BigInteger(endSeq)) >= 0;
   } catch (NumberFormatException e) {
       return false; // conservative: assume not fully consumed
   }
   ```
   
   β€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036407288)) 
(source:comment#3036407288)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java:
##########
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.KinesisSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+
+/**
+ * Helper for reading from Kinesis Data Streams and managing checkpoints.
+ * Checkpoint format: 
streamName,shardId:sequenceNumber,shardId:sequenceNumber,...
+ */
+@Slf4j
+@Getter
+public class KinesisOffsetGen {
+
+  public static class CheckpointUtils {
+    /** Separator between lastSeq and endSeq for closed shards. Seq numbers 
are numeric, so this is safe. */
+    private static final String END_SEQ_SEPARATOR = "|";
+    /**
+     * Separator between lastSeq and arrivalTime (epoch millis of the record 
with last sequence number).
+     * '@' is used as it is absent from numeric Kinesis sequence numbers and 
visually distinct from '|'.
+     */
+    private static final String ARRIVAL_TIME_SEPARATOR = "@";
+    /**
+     * Kinesis checkpoint pattern.
+     * Format: streamName,shardId:lastSeq[@arrivalTime][|endSeq],...
+     * For closed shards we store lastSeq|endSeq (or 
lastSeq@arrivalTime|endSeq with arrival time)
+     * so we can detect data loss when shard expires.
+     */
+    private static final Pattern PATTERN = Pattern.compile(".*,.*:.*");
+
+    /**
+     * Parse checkpoint string to shardId -> value map.
+     * Value format: lastSeq, or lastSeq@arrivalTime, or lastSeq|endSeq, or 
lastSeq@arrivalTime|endSeq.
+     */
+    public static Map<String, String> strToOffsets(String checkpointStr) {
+      Map<String, String> offsetMap = new HashMap<>();
+      String[] splits = checkpointStr.split(",");
+      for (int i = 1; i < splits.length; i++) {
+        String part = splits[i];
+        int colonIdx = part.indexOf(':');
+        if (colonIdx > 0 && colonIdx < part.length() - 1) {
+          String shardId = part.substring(0, colonIdx);
+          String value = part.substring(colonIdx + 1);
+          offsetMap.put(shardId, value);
+        }
+      }
+      return offsetMap;
+    }
+
+    /**
+     * Extract lastSeq from a checkpoint value.
+     * Handles formats: "lastSeq", "lastSeq|endSeq", "lastSeq@arrivalTime", 
"lastSeq@arrivalTime|endSeq".
+     */
+    public static String getLastSeqFromValue(String value) {
+      if (value == null || value.isEmpty()) {
+        return value;
+      }
+      int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR);
+      if (arrivalSep >= 0) {
+        return value.substring(0, arrivalSep);
+      }
+      int endSep = value.indexOf(END_SEQ_SEPARATOR);
+      return endSep >= 0 ? value.substring(0, endSep) : value;
+    }
+
+    /**
+     * Extract arrivalTime (epoch millis) from a checkpoint value if present, 
otherwise null.
+     * Handles formats: "lastSeq@arrivalTime" and "lastSeq@arrivalTime|endSeq".
+     */
+    public static Long getArrivalTimeFromValue(String value) {
+      if (value == null || value.isEmpty()) {
+        return null;
+      }
+      int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR);
+      if (arrivalSep < 0) {
+        return null;
+      }
+      String rest = value.substring(arrivalSep + 
ARRIVAL_TIME_SEPARATOR.length());
+      int endSep = rest.indexOf(END_SEQ_SEPARATOR);
+      String arrivalStr = endSep >= 0 ? rest.substring(0, endSep) : rest;
+      try {
+        return Long.parseLong(arrivalStr);
+      } catch (NumberFormatException e) {
+        return null;
+      }
+    }
+
+    /**
+     * Extract endSeq from a checkpoint value if present. Returns null for 
open shards.
+     * Handles formats: "lastSeq|endSeq" and "lastSeq@arrivalTime|endSeq".
+     * Since '@' and '|' are distinct, '|' always unambiguously marks the 
endSeq regardless of
+     * whether an arrival time is present.
+     */
+    public static String getEndSeqFromValue(String value) {
+      if (value == null || value.isEmpty()) {
+        return null;
+      }
+      int endSep = value.indexOf(END_SEQ_SEPARATOR);
+      return endSep >= 0 && endSep < value.length() - 1 ? 
value.substring(endSep + 1) : null;
+    }
+
+    /**
+     * Parse a checkpoint value into (lastSeq, endSeq). Combines {@link 
#getLastSeqFromValue} and
+     * {@link #getEndSeqFromValue} into a single call to avoid parsing the 
value string twice.
+     * @return Pair where left=lastSeq (empty Option when absent), 
right=endSeq (empty Option for open shards)
+     */
+    public static Pair<Option<String>, Option<String>> 
parseCheckpointValue(String value) {
+      return Pair.of(Option.ofNullable(getLastSeqFromValue(value)),
+          Option.ofNullable(getEndSeqFromValue(value)));
+    }
+
+    /**
+     * Build checkpoint value without arrival time: "lastSeq" or 
"lastSeq|endSeq".
+     */
+    public static String buildCheckpointValue(String lastSeq, String endSeq) {
+      return buildCheckpointValue(lastSeq, null, endSeq);
+    }
+
+    /**
+     * Build checkpoint value with optional arrival time.
+     * Format: lastSeq[@arrivalTime][|endSeq]
+     */
+    public static String buildCheckpointValue(String lastSeq, Long 
arrivalTime, String endSeq) {
+      StringBuilder sb = new StringBuilder(lastSeq != null ? lastSeq : "");
+      if (arrivalTime != null) {
+        sb.append(ARRIVAL_TIME_SEPARATOR).append(arrivalTime);
+      }
+      if (endSeq != null && !endSeq.isEmpty()) {
+        sb.append(END_SEQ_SEPARATOR).append(endSeq);
+      }
+      return sb.toString();
+    }
+
+    /**
+     * String representation of checkpoint.
+     * Format: streamName,shardId:value,shardId:value,... where value is 
lastSeq or lastSeq|endSeq.
+     */
+    public static String offsetsToStr(String streamName, Map<String, String> 
shardToValue) {
+      String parts = shardToValue.entrySet().stream()
+          .sorted(Map.Entry.comparingByKey())
+          .map(e -> e.getKey() + ":" + e.getValue())
+          .collect(Collectors.joining(","));
+      return streamName + "," + parts;
+    }
+
+    /**
+     * Returns true when {@code lastCheckpointStr} is a well-formed Kinesis 
checkpoint for {@code streamName}.
+     * Checks both format (streamName,shardId:seq,...) and that the embedded 
stream name matches.
+     */
+    public static boolean isValidStreamCheckpoint(Option<String> 
lastCheckpointStr, String streamName) {
+      return lastCheckpointStr.isPresent()
+          && PATTERN.matcher(lastCheckpointStr.get()).matches()
+          && lastCheckpointStr.get().startsWith(streamName + ",");
+    }
+  }
+
+  /**
+   * Represents a shard to read from, with optional starting sequence number.
+   * For closed shards, endingSequenceNumber is set so we can store it in the 
checkpoint
+   * and later detect data loss when the shard expires.
+   */
+  @AllArgsConstructor
+  @Getter
+  public static class KinesisShardRange implements java.io.Serializable {
+    private final String shardId;
+    /** If empty, use TRIM_HORIZON or LATEST based on config. */
+    private final Option<String> startingSequenceNumber;
+    /** For closed shards: the shard's ending sequence number. Empty for open 
shards. */
+    private final Option<String> endingSequenceNumber;
+
+    public static KinesisShardRange of(String shardId, Option<String> seqNum) {
+      return new KinesisShardRange(shardId, seqNum, Option.empty());
+    }
+
+    public static KinesisShardRange of(String shardId, Option<String> seqNum, 
Option<String> endSeq) {
+      return new KinesisShardRange(shardId, seqNum, endSeq);
+    }
+
+    /**
+     * Returns true if this range may have unread records, false if we can 
definitively determine it has none.
+     * Uses conservative default (useLatestWhenNoCheckpoint=false).
+     */
+    public boolean hasUnreadRecords() {
+      return hasUnreadRecords(false);
+    }
+
+    /**
+     * Returns true if this range may have unread records, false if we can 
definitively determine it has none.
+     * <ul>
+     *   <li>Open shard: always true (may have new records)</li>
+     *   <li>Closed shard, lastSeq >= endSeq: false (fully consumed)</li>
+     *   <li>Closed shard, no checkpoint and useLatest: false (at LATEST tip, 
closed shard has no records)</li>
+     *   <li>Otherwise: true (may have unread records or cannot definitively 
say)</li>
+     * </ul>
+     *
+     * @param useLatestWhenNoCheckpoint when startingSequenceNumber is empty, 
true means we use LATEST
+     *        (start at tip); for a closed shard there are no records to read
+     */
+    public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) {
+      String lastSeq = startingSequenceNumber.orElse(null);
+      String endSeq = endingSequenceNumber.orElse(null);
+
+      // CASE 1: Open shard: may have records
+      if (endSeq == null || endSeq.isEmpty()) {
+        return true;
+      }
+      // CASE 2: Closed shard with no checkpoint
+      if (lastSeq == null || lastSeq.isEmpty()) {
+        return !useLatestWhenNoCheckpoint;
+      }
+      // CASE 3: Closed shard: lastSeq >= endSeq means fully consumed
+      if (lastSeq.compareTo(endSeq) >= 0) {
+        return false;
+      }
+      // CASE 4: lastSeq < endSeq: may have unread records
+      return true;
+    }

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   <details>
   <summary>🧩 Analysis chain</summary>
   
   🌐 Web query:
   
   `Are AWS Kinesis sequence numbers always the same length within a shard, or 
can they vary in length?`
   
   πŸ’‘ Result:
   
   AWS Kinesis sequence numbers can vary in length within a shard. They are 
strings matching the pattern 0|([1-9]\d{0,128}), where "0" has length 1, and 
others start with 1-9 followed by 0 to 128 digits, allowing lengths from 1 up 
to 129 characters.
   
   Citations:
   
   - 1: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html
   - 2: 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
   - 3: 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
   
   ---
   
   🏁 Script executed:
   
   ```shell
   find . -type f -name "KinesisOffsetGen.java" | head -5
   ```
   
   Repository: yihua/hudi
   
   Length of output: 148
   
   ---
   
   🏁 Script executed:
   
   ```shell
   wc -l 
./hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 152
   
   ---
   
   🏁 Script executed:
   
   ```shell
   sed -n '252,270p' 
./hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 745
   
   ---
   
   🏁 Script executed:
   
   ```shell
   sed -n '460,470p' 
./hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 728
   
   ---
   
   🏁 Script executed:
   
   ```shell
   sed -n '494,504p' 
./hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 564
   
   ---
   
   🏁 Script executed:
   
   ```shell
   grep -n "compareTo" 
./hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
 | head -20
   ```
   
   Repository: yihua/hudi
   
   Length of output: 248
   
   ---
   
   🏁 Script executed:
   
   ```shell
   cat -n 
./hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
 | sed -n '210,215p'
   ```
   
   Repository: yihua/hudi
   
   Length of output: 352
   
   ---
   
   🏁 Script executed:
   
   ```shell
   sed -n '215,225p' 
./hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 609
   
   ---
   
   
   
   </details>
   
   **String.compareTo() incorrectly compares Kinesis sequence numbers, 
affecting unread records and data loss detection.**
   
   Kinesis sequence numbers are 128-bit integers represented as variable-length 
decimal strings (1–129 characters per AWS API docs). Using `String.compareTo()` 
performs lexicographic comparison, which produces incorrect results when 
lengths differβ€”for example, `"9".compareTo("10") > 0` but numerically `9 < 10`.
   
   This bug occurs at three locations:
   - **Line 265** in `hasUnreadRecords()`: Determines if a shard has unread 
records
   - **Line 464**: Detects fully consumed expired shards
   - **Line 498**: Detects data loss from trimmed records
   
   Replace all three with numeric comparison using `BigInteger`:
   
   ```java
   private static int compareSequenceNumbers(String seq1, String seq2) {
     return new java.math.BigInteger(seq1).compareTo(new 
java.math.BigInteger(seq2));
   }
   ```
   
   Then update each location:
   - Line 265: `lastSeq.compareTo(endSeq) >= 0` β†’ 
`compareSequenceNumbers(lastSeq, endSeq) >= 0`
   - Line 464: `last.compareTo(endSeqOpt.get()) >= 0` β†’ 
`compareSequenceNumbers(last, endSeqOpt.get()) >= 0`
   - Line 498: `lastSeq.compareTo(shardStartSeq) < 0` β†’ 
`compareSequenceNumbers(lastSeq, shardStartSeq) < 0`
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java`
   around lines 252 - 270, The current logic in KinesisOffsetGen uses
   String.compareTo on Kinesis sequence number strings (e.g.,
   lastSeq.compareTo(endSeq) and similar checks), which is lexicographic and
   incorrect for numeric sequence values; add a helper method
   compareSequenceNumbers(String seq1, String seq2) that converts the strings to
   java.math.BigInteger and returns their numeric comparison, then replace all
   string comparisons: in hasUnreadRecords replace lastSeq.compareTo(endSeq) >= 0
   with compareSequenceNumbers(lastSeq, endSeq) >= 0, update the 
shard-expiration
   check that uses last.compareTo(endSeqOpt.get()) >= 0 to use
   compareSequenceNumbers(last, endSeqOpt.get()) >= 0, and replace the 
trimmed-data
   detection lastSeq.compareTo(shardStartSeq) < 0 with
   compareSequenceNumbers(lastSeq, shardStartSeq) < 0 so all sequence 
comparisons
   are numeric.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:ocelot:7a2a8930-2f98-4e83-9519-06ffc64e3eaa -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036410514)) 
(source:comment#3036410514)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java:
##########
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.KinesisSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
+import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+
+/**
+ * Helper for reading from Kinesis Data Streams and managing checkpoints.
+ * Checkpoint format: 
streamName,shardId:sequenceNumber,shardId:sequenceNumber,...
+ */
+@Slf4j
+@Getter
+public class KinesisOffsetGen {
+
+  public static class CheckpointUtils {
+    /** Separator between lastSeq and endSeq for closed shards. Seq numbers 
are numeric, so this is safe. */
+    private static final String END_SEQ_SEPARATOR = "|";
+    /**
+     * Separator between lastSeq and arrivalTime (epoch millis of the record 
with last sequence number).
+     * '@' is used as it is absent from numeric Kinesis sequence numbers and 
visually distinct from '|'.
+     */
+    private static final String ARRIVAL_TIME_SEPARATOR = "@";
+    /**
+     * Kinesis checkpoint pattern.
+     * Format: streamName,shardId:lastSeq[@arrivalTime][|endSeq],...
+     * For closed shards we store lastSeq|endSeq (or 
lastSeq@arrivalTime|endSeq with arrival time)
+     * so we can detect data loss when shard expires.
+     */
+    private static final Pattern PATTERN = Pattern.compile(".*,.*:.*");
+
+    /**
+     * Parse checkpoint string to shardId -> value map.
+     * Value format: lastSeq, or lastSeq@arrivalTime, or lastSeq|endSeq, or 
lastSeq@arrivalTime|endSeq.
+     */
+    public static Map<String, String> strToOffsets(String checkpointStr) {
+      Map<String, String> offsetMap = new HashMap<>();
+      String[] splits = checkpointStr.split(",");
+      for (int i = 1; i < splits.length; i++) {
+        String part = splits[i];
+        int colonIdx = part.indexOf(':');
+        if (colonIdx > 0 && colonIdx < part.length() - 1) {
+          String shardId = part.substring(0, colonIdx);
+          String value = part.substring(colonIdx + 1);
+          offsetMap.put(shardId, value);
+        }
+      }
+      return offsetMap;
+    }
+
+    /**
+     * Extract lastSeq from a checkpoint value.
+     * Handles formats: "lastSeq", "lastSeq|endSeq", "lastSeq@arrivalTime", 
"lastSeq@arrivalTime|endSeq".
+     */
+    public static String getLastSeqFromValue(String value) {
+      if (value == null || value.isEmpty()) {
+        return value;
+      }
+      int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR);
+      if (arrivalSep >= 0) {
+        return value.substring(0, arrivalSep);
+      }
+      int endSep = value.indexOf(END_SEQ_SEPARATOR);
+      return endSep >= 0 ? value.substring(0, endSep) : value;
+    }
+
+    /**
+     * Extract arrivalTime (epoch millis) from a checkpoint value if present, 
otherwise null.
+     * Handles formats: "lastSeq@arrivalTime" and "lastSeq@arrivalTime|endSeq".
+     */
+    public static Long getArrivalTimeFromValue(String value) {
+      if (value == null || value.isEmpty()) {
+        return null;
+      }
+      int arrivalSep = value.indexOf(ARRIVAL_TIME_SEPARATOR);
+      if (arrivalSep < 0) {
+        return null;
+      }
+      String rest = value.substring(arrivalSep + 
ARRIVAL_TIME_SEPARATOR.length());
+      int endSep = rest.indexOf(END_SEQ_SEPARATOR);
+      String arrivalStr = endSep >= 0 ? rest.substring(0, endSep) : rest;
+      try {
+        return Long.parseLong(arrivalStr);
+      } catch (NumberFormatException e) {
+        return null;
+      }
+    }
+
+    /**
+     * Extract endSeq from a checkpoint value if present. Returns null for 
open shards.
+     * Handles formats: "lastSeq|endSeq" and "lastSeq@arrivalTime|endSeq".
+     * Since '@' and '|' are distinct, '|' always unambiguously marks the 
endSeq regardless of
+     * whether an arrival time is present.
+     */
+    public static String getEndSeqFromValue(String value) {
+      if (value == null || value.isEmpty()) {
+        return null;
+      }
+      int endSep = value.indexOf(END_SEQ_SEPARATOR);
+      return endSep >= 0 && endSep < value.length() - 1 ? 
value.substring(endSep + 1) : null;
+    }
+
+    /**
+     * Parse a checkpoint value into (lastSeq, endSeq). Combines {@link 
#getLastSeqFromValue} and
+     * {@link #getEndSeqFromValue} into a single call to avoid parsing the 
value string twice.
+     * @return Pair where left=lastSeq (empty Option when absent), 
right=endSeq (empty Option for open shards)
+     */
+    public static Pair<Option<String>, Option<String>> 
parseCheckpointValue(String value) {
+      return Pair.of(Option.ofNullable(getLastSeqFromValue(value)),
+          Option.ofNullable(getEndSeqFromValue(value)));
+    }
+
+    /**
+     * Build checkpoint value without arrival time: "lastSeq" or 
"lastSeq|endSeq".
+     */
+    public static String buildCheckpointValue(String lastSeq, String endSeq) {
+      return buildCheckpointValue(lastSeq, null, endSeq);
+    }
+
+    /**
+     * Build checkpoint value with optional arrival time.
+     * Format: lastSeq[@arrivalTime][|endSeq]
+     */
+    public static String buildCheckpointValue(String lastSeq, Long 
arrivalTime, String endSeq) {
+      StringBuilder sb = new StringBuilder(lastSeq != null ? lastSeq : "");
+      if (arrivalTime != null) {
+        sb.append(ARRIVAL_TIME_SEPARATOR).append(arrivalTime);
+      }
+      if (endSeq != null && !endSeq.isEmpty()) {
+        sb.append(END_SEQ_SEPARATOR).append(endSeq);
+      }
+      return sb.toString();
+    }
+
+    /**
+     * String representation of checkpoint.
+     * Format: streamName,shardId:value,shardId:value,... where value is 
lastSeq or lastSeq|endSeq.
+     */
+    public static String offsetsToStr(String streamName, Map<String, String> 
shardToValue) {
+      String parts = shardToValue.entrySet().stream()
+          .sorted(Map.Entry.comparingByKey())
+          .map(e -> e.getKey() + ":" + e.getValue())
+          .collect(Collectors.joining(","));
+      return streamName + "," + parts;
+    }
+
+    /**
+     * Returns true when {@code lastCheckpointStr} is a well-formed Kinesis 
checkpoint for {@code streamName}.
+     * Checks both format (streamName,shardId:seq,...) and that the embedded 
stream name matches.
+     */
+    public static boolean isValidStreamCheckpoint(Option<String> 
lastCheckpointStr, String streamName) {
+      return lastCheckpointStr.isPresent()
+          && PATTERN.matcher(lastCheckpointStr.get()).matches()
+          && lastCheckpointStr.get().startsWith(streamName + ",");
+    }
+  }
+
+  /**
+   * Represents a shard to read from, with optional starting sequence number.
+   * For closed shards, endingSequenceNumber is set so we can store it in the 
checkpoint
+   * and later detect data loss when the shard expires.
+   */
+  @AllArgsConstructor
+  @Getter
+  public static class KinesisShardRange implements java.io.Serializable {
+    private final String shardId;
+    /** If empty, use TRIM_HORIZON or LATEST based on config. */
+    private final Option<String> startingSequenceNumber;
+    /** For closed shards: the shard's ending sequence number. Empty for open 
shards. */
+    private final Option<String> endingSequenceNumber;
+
+    public static KinesisShardRange of(String shardId, Option<String> seqNum) {
+      return new KinesisShardRange(shardId, seqNum, Option.empty());
+    }
+
+    public static KinesisShardRange of(String shardId, Option<String> seqNum, 
Option<String> endSeq) {
+      return new KinesisShardRange(shardId, seqNum, endSeq);
+    }
+
+    /**
+     * Returns true if this range may have unread records, false if we can 
definitively determine it has none.
+     * Uses conservative default (useLatestWhenNoCheckpoint=false).
+     */
+    public boolean hasUnreadRecords() {
+      return hasUnreadRecords(false);
+    }
+
+    /**
+     * Returns true if this range may have unread records, false if we can 
definitively determine it has none.
+     * <ul>
+     *   <li>Open shard: always true (may have new records)</li>
+     *   <li>Closed shard, lastSeq >= endSeq: false (fully consumed)</li>
+     *   <li>Closed shard, no checkpoint and useLatest: false (at LATEST tip, 
closed shard has no records)</li>
+     *   <li>Otherwise: true (may have unread records or cannot definitively 
say)</li>
+     * </ul>
+     *
+     * @param useLatestWhenNoCheckpoint when startingSequenceNumber is empty, 
true means we use LATEST
+     *        (start at tip); for a closed shard there are no records to read
+     */
+    public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) {
+      String lastSeq = startingSequenceNumber.orElse(null);
+      String endSeq = endingSequenceNumber.orElse(null);
+
+      // CASE 1: Open shard: may have records
+      if (endSeq == null || endSeq.isEmpty()) {
+        return true;
+      }
+      // CASE 2: Closed shard with no checkpoint
+      if (lastSeq == null || lastSeq.isEmpty()) {
+        return !useLatestWhenNoCheckpoint;
+      }
+      // CASE 3: Closed shard: lastSeq >= endSeq means fully consumed
+      if (lastSeq.compareTo(endSeq) >= 0) {
+        return false;
+      }
+      // CASE 4: lastSeq < endSeq: may have unread records
+      return true;
+    }
+  }
+
+  /** Name of the Kinesis Data Stream to consume from (e.g. "my-stream"). */
+  private final String streamName;
+  /** AWS region where the Kinesis stream is provisioned (e.g. "us-east-1"). 
Required because KinesisClient is region-scoped β€” each client instance connects 
to exactly one regional endpoint. */
+  private final String region;
+  /** Optional custom Kinesis endpoint URL, used for localstack or VPC 
endpoints. Empty when using the default AWS endpoint. */
+  private final Option<String> endpointUrl;
+  /** Strategy that determines where to start reading when no prior checkpoint 
exists (LATEST, TRIM_HORIZON, etc.). */
+  private final KinesisSourceConfig.KinesisStartingPositionStrategy 
startingPositionStrategy;
+  /** Raw configuration properties passed from HoodieStreamer; used to resolve 
Kinesis-specific and shared settings. */
+  private final TypedProperties props;
+
+  public KinesisOffsetGen(TypedProperties props) {
+    this.props = props;
+    checkRequiredConfigProperties(props,
+        Arrays.asList(KinesisSourceConfig.KINESIS_STREAM_NAME, 
KinesisSourceConfig.KINESIS_REGION));
+    this.streamName = getStringWithAltKeys(props, 
KinesisSourceConfig.KINESIS_STREAM_NAME);
+    this.region = getStringWithAltKeys(props, 
KinesisSourceConfig.KINESIS_REGION);
+    this.endpointUrl = Option.ofNullable(getStringWithAltKeys(props, 
KinesisSourceConfig.KINESIS_ENDPOINT_URL, null));
+    this.startingPositionStrategy = 
KinesisSourceConfig.KinesisStartingPositionStrategy.fromString(
+        getStringWithAltKeys(props, 
KinesisSourceConfig.KINESIS_STARTING_POSITION, true));
+  }
+
+  /**
+   * Builds a Kinesis client from explicit parameters. Used by both the 
instance method
+   * {@link #createKinesisClient()} and by {@link 
org.apache.hudi.utilities.sources.JsonKinesisSource}
+   * from serializable {@link KinesisReadConfig} in Spark closures.
+   */
+  public static KinesisClient createKinesisClient(String region, String 
endpointUrl,
+      String accessKey, String secretKey) {
+    KinesisClientBuilder builder = 
KinesisClient.builder().region(Region.of(region));
+    if (endpointUrl != null && !endpointUrl.isEmpty()) {
+      builder = builder.endpointOverride(URI.create(endpointUrl));
+    }
+    if (accessKey != null && !accessKey.isEmpty() && secretKey != null && 
!secretKey.isEmpty()) {
+      builder = builder.credentialsProvider(
+          
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, 
secretKey)));
+    }
+    return builder.build();
+  }
+
+  public KinesisClient createKinesisClient() {
+    String accessKey = getStringWithAltKeys(props, 
KinesisSourceConfig.KINESIS_ACCESS_KEY, null);
+    String secretKey = getStringWithAltKeys(props, 
KinesisSourceConfig.KINESIS_SECRET_KEY, null);
+    return createKinesisClient(region, endpointUrl.orElse(null), accessKey, 
secretKey);
+  }
+
+  /**
+   * List all active shards for the stream.
+   * Note: AWS API disallows streamName and nextToken in the same request.
+   */
+  public List<Shard> listShards(KinesisClient client) {
+    List<Shard> allShards = new ArrayList<>();
+    String nextToken = null;
+    do {
+      ListShardsRequest request = nextToken != null
+          ? ListShardsRequest.builder().nextToken(nextToken).build()
+          : ListShardsRequest.builder().streamName(streamName).build();
+      ListShardsResponse response;
+      try {
+        response = client.listShards(request);
+      } catch (ResourceNotFoundException e) {
+        throw new HoodieReadFromSourceException("Kinesis stream " + streamName 
+ " not found", e);
+      } catch (ProvisionedThroughputExceededException e) {
+        throw new HoodieReadFromSourceException("Kinesis throughput exceeded 
listing shards for " + streamName, e);
+      } catch (LimitExceededException e) {
+        throw new HoodieReadFromSourceException("Kinesis limit exceeded 
listing shards: " + e.getMessage(), e);
+      }
+      allShards.addAll(response.shards());
+      nextToken = response.nextToken();
+    } while (nextToken != null);
+
+    // Include both open and closed shards. Closed shards (e.g., from 
resharding) may still contain
+    // unread records within the retention period. GetRecords works on closed 
shards until all data
+    // is consumed, at which point NextShardIterator returns null.
+    long numOpenShards = allShards.stream()
+        .filter(s -> s.sequenceNumberRange() != null && 
s.sequenceNumberRange().endingSequenceNumber() == null)
+        .count();
+    log.info("Found {} shards for stream {} ({} open, {} closed)",
+        allShards.size(), streamName, numOpenShards, allShards.size() - 
numOpenShards);
+    logShardSequenceRanges(allShards);
+    return allShards;
+  }
+
+  /**
+   * Logs each shard's start/end sequence number so they can be used when 
resetting the checkpoint.
+   */
+  private void logShardSequenceRanges(List<Shard> shards) {
+    for (Shard shard : shards) {
+      String startSeq = (shard.sequenceNumberRange() != null && 
shard.sequenceNumberRange().startingSequenceNumber() != null)
+          ? shard.sequenceNumberRange().startingSequenceNumber() : "n/a";
+      String endSeq = (shard.sequenceNumberRange() != null && 
shard.sequenceNumberRange().endingSequenceNumber() != null)
+          ? shard.sequenceNumberRange().endingSequenceNumber() : null;
+      if (endSeq != null) {
+        log.info("Shard {}: startSeq={}, endSeq={} (for checkpoint reset: 
{}:{}|{})",
+            shard.shardId(), startSeq, endSeq, shard.shardId(), startSeq, 
endSeq);
+      } else {
+        log.info("Shard {}: startSeq={}, endSeq=open (for checkpoint reset 
from start: {}:{})",
+            shard.shardId(), startSeq, shard.shardId(), startSeq);
+      }
+    }
+  }
+
+  /**
+   * Get shard ranges to read, based on checkpoint and limits.
+   */
+  public KinesisShardRange[] getNextShardRanges(Option<Checkpoint> 
lastCheckpoint, long sourceLimit) {
+    long numEvents = calculateNumEvents(sourceLimit, props);
+
+    try (KinesisClient client = createKinesisClient()) {
+      // STEP 1: List all open and closed shards from the server.
+      // Note: no expired shards.
+      List<Shard> shards = listShards(client);
+      if (shards.isEmpty()) {
+        return new KinesisShardRange[0];
+      }
+      // STEP 2: parse last checkpoint if exists.
+      Map<String, String> fromSequenceNumbers = new HashMap<>();
+      Option<String> lastCheckpointStr = lastCheckpoint.isPresent()
+          ? Option.of(lastCheckpoint.get().getCheckpointKey()) : 
Option.empty();
+      if (CheckpointUtils.isValidStreamCheckpoint(lastCheckpointStr, 
streamName)) {
+        Map<String, String> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
+        if (!checkpointOffsets.isEmpty()) {
+          // Check for expired shards (checkpoint references shards no longer 
in stream, e.g., past retention)
+          Set<String> availableShardIds = 
shards.stream().map(Shard::shardId).collect(Collectors.toSet());
+          List<String> expiredShardIds = checkpointOffsets.keySet().stream()
+              .filter(id -> !availableShardIds.contains(id))
+              .collect(Collectors.toList());
+          // Handle expired shards that exist in the last checkpoint.
+          // This is important to detect data loss.
+          if (!expiredShardIds.isEmpty()) {
+            checkDataLossOnExpiredShards(expiredShardIds, checkpointOffsets);
+          }
+          // Handle regular case.
+          // Parse lastSeq for open and closed shards.
+          // For closed shards, even if all their records have been consumed, 
they are still included.
+          for (String shardId : availableShardIds) {
+            if (checkpointOffsets.containsKey(shardId)) {
+              Option<String> lastSeqOpt = Option.ofNullable(
+                  
CheckpointUtils.getLastSeqFromValue(checkpointOffsets.get(shardId)))
+                  .flatMap(seq -> seq.isEmpty() ? Option.empty() : 
Option.of(seq));
+              lastSeqOpt.ifPresent(seq -> fromSequenceNumbers.put(shardId, 
seq));
+            }
+          }
+          // Check if any available shard's checkpoint lastSeq has fallen 
behind the shard's
+          // trim horizon, i.e., records between lastSeq and the earliest 
available seq were trimmed.
+          checkDataLossOnAvailableShards(shards, fromSequenceNumbers);
+        }
+      }
+      // STEP 3: Create ranges.
+      List<KinesisShardRange> ranges = new ArrayList<>();
+      for (Shard shard : shards) {
+        String shardId = shard.shardId();
+        Option<String> startSeq = fromSequenceNumbers.containsKey(shardId)
+            ? Option.of(fromSequenceNumbers.get(shardId))
+            : Option.empty();
+        Option<String> endSeq = shard.sequenceNumberRange() != null
+            ? 
Option.ofNullable(shard.sequenceNumberRange().endingSequenceNumber())
+            : Option.empty();
+        ranges.add(KinesisShardRange.of(shardId, startSeq, endSeq));
+      }
+      log.info("About to read up to {} events from {} shards in stream {})",
+          numEvents, ranges.size(), streamName);

Review Comment:
   _⚠️ Potential issue_ | _🟑 Minor_
   
   **Typo in log message: unmatched closing parenthesis.**
   
   The log message has an extra `)` at the end that appears unintentional.
   
   
   <details>
   <summary>✏️ Proposed fix</summary>
   
   ```diff
   -      log.info("About to read up to {} events from {} shards in stream {})",
   +      log.info("About to read up to {} events from {} shards in stream {}",
              numEvents, ranges.size(), streamName);
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>πŸ“ Committable suggestion</summary>
   
   > ‼️ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
         log.info("About to read up to {} events from {} shards in stream {}",
             numEvents, ranges.size(), streamName);
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java`
   around lines 433 - 434, The log message in KinesisOffsetGen's log.info call 
has
   an extra closing parenthesis at the end of the format string; update the
   log.info invocation (the line that logs "About to read up to {} events from 
{}
   shards in stream {})") to remove the stray ')' so the format string is 
"About to
   read up to {} events from {} shards in stream {}", keeping the same arguments
   (numEvents, ranges.size(), streamName).
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:ocelot:7a2a8930-2f98-4e83-9519-06ffc64e3eaa -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036410517)) 
(source:comment#3036410517)



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -3112,6 +3120,348 @@ public void testJsonKafkaDFSSource() throws Exception {
     deltaStreamer.shutdownGracefully();
   }
 
+  /**
+   * Tests DeltaStreamer ingestion from JsonKinesisSource using LocalStack.
+   */
+  @Nested
+  
@org.junit.jupiter.api.TestInstance(org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS)
+  class TestKinesisSource {
+
+    private KinesisTestUtils kinesisTestUtils;
+
+    @BeforeAll
+    void initKinesis() {
+      kinesisTestUtils = new KinesisTestUtils().setup();
+    }
+
+    @AfterAll
+    void tearDownKinesis() {
+      if (kinesisTestUtils != null) {
+        kinesisTestUtils.teardown();
+      }
+    }
+
+    @Test
+    public void testJsonKinesisDFSSource() throws Exception {
+      String streamName = "test-kinesis-stream-" + testNum;
+      // Create stream with at least 2 shards to exercise multi-shard reading
+      prepareJsonKinesisDFSFiles(JSON_KINESIS_NUM_RECORDS, true, streamName);
+      prepareJsonKinesisDFSSource(PROPS_FILENAME_TEST_JSON_KINESIS, 
streamName);
+      String tableBasePath = basePath + "/test_json_kinesis_table" + testNum;
+      HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+          TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
LocalStackJsonKinesisSource.class.getName(),
+              Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KINESIS, false,
+              true, 100000, false, null, null, "timestamp", null), jsc);
+
+      // Track commit completion times for incremental queries
+      List<String> commitCompletionTimes = new ArrayList<>();
+
+      // Batch 1: initial ingestion
+      deltaStreamer.sync();
+      assertRecordCount(JSON_KINESIS_NUM_RECORDS, tableBasePath, sqlContext);
+      String commit1 = getLatestCommitInstantTime(tableBasePath);
+      commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+      assertRecordsForCommit(tableBasePath, commit1, JSON_KINESIS_NUM_RECORDS);
+
+      int totalRecords = JSON_KINESIS_NUM_RECORDS;
+      // Batch 2
+      int batch2Records = 10;
+      totalRecords += batch2Records;
+      prepareJsonKinesisDFSFiles(batch2Records, false, streamName);
+      deltaStreamer.sync();
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      String commit2 = getLatestCommitInstantTime(tableBasePath);
+      commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+      assertRecordsForCommit(tableBasePath, commit2, batch2Records);
+
+      // Batch 3
+      int batch3Records = 8;
+      totalRecords += batch3Records;
+      prepareJsonKinesisDFSFiles(batch3Records, false, streamName);
+      deltaStreamer.sync();
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      String commit3 = getLatestCommitInstantTime(tableBasePath);
+      commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+      assertRecordsForCommit(tableBasePath, commit3, batch3Records);
+
+      // Batch 4
+      int batch4Records = 7;
+      totalRecords += batch4Records;
+      prepareJsonKinesisDFSFiles(batch4Records, false, streamName);
+      deltaStreamer.sync();
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      String commit4 = getLatestCommitInstantTime(tableBasePath);
+      commitCompletionTimes.add(getCommitCompletionTime(tableBasePath));
+      assertRecordsForCommit(tableBasePath, commit4, batch4Records);
+      deltaStreamer.shutdownGracefully();
+
+      // Incremental queries for a subset of commits
+      sqlContext.clearCache();
+      String c1 = commitCompletionTimes.get(0);
+      String c2 = commitCompletionTimes.get(1);
+      String c3 = commitCompletionTimes.get(2);
+      String c4 = commitCompletionTimes.get(3);
+
+      // Incremental: first commit only (batch 1)
+      Dataset<Row> incrBatch1 = sqlContext.read().format("org.apache.hudi")
+          .options(hudiOpts)
+          .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+          .option(DataSourceReadOptions.START_COMMIT().key(), "000")
+          .option(DataSourceReadOptions.END_COMMIT().key(), c1)
+          .load(tableBasePath);
+      assertEquals(JSON_KINESIS_NUM_RECORDS, incrBatch1.count(), "Incremental 
read for batch 1");
+
+      // Incremental: commits 2 and 3 (batches 2 + 3)
+      Dataset<Row> incrBatches2And3 = 
sqlContext.read().format("org.apache.hudi")
+          .options(hudiOpts)
+          .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+          .option(DataSourceReadOptions.START_COMMIT().key(), c1)
+          .option(DataSourceReadOptions.END_COMMIT().key(), c3)
+          .load(tableBasePath);
+      assertEquals(batch2Records + batch3Records, incrBatches2And3.count(), 
"Incremental read for batches 2 and 3");
+
+      // Incremental: commits 3 and 4 (batches 3 + 4)
+      Dataset<Row> incrBatches3And4 = 
sqlContext.read().format("org.apache.hudi")
+          .options(hudiOpts)
+          .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
+          .option(DataSourceReadOptions.START_COMMIT().key(), c2)
+          .option(DataSourceReadOptions.END_COMMIT().key(), c4)
+          .load(tableBasePath);
+      assertEquals(batch3Records + batch4Records, incrBatches3And4.count(), 
"Incremental read for batches 3 and 4");
+
+      // Verify data correctness: read back and check _row_key exists
+      sqlContext.clearCache();
+      Dataset<Row> ds = 
sqlContext.read().format("org.apache.hudi").load(tableBasePath);
+      assertEquals(totalRecords, ds.count());
+      assertTrue(ds.filter("_row_key is not null").count() > 0);
+    }
+
+    @Test
+    public void testJsonKinesisAggregatedRecords() throws Exception {
+      String streamName = "test-kinesis-stream-agg-" + testNum;
+      kinesisTestUtils.createStream(streamName, 2);
+      HoodieTestDataGenerator dataGenerator = new 
HoodieTestDataGenerator(System.nanoTime());
+      String[] jsonRecords = UtilitiesTestBase.Helpers.jsonifyRecords(
+          dataGenerator.generateInsertsAsPerSchema("000", 6, 
HoodieTestDataGenerator.TRIP_SCHEMA));
+      kinesisTestUtils.sendAggregatedRecords(streamName, jsonRecords);
+      prepareJsonKinesisDFSSource(PROPS_FILENAME_TEST_JSON_KINESIS, 
streamName);
+      String tableBasePath = basePath + "/test_json_kinesis_agg_table" + 
testNum;
+      HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+          TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
LocalStackJsonKinesisSource.class.getName(),
+              Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KINESIS, false,
+              true, 100000, false, null, null, "timestamp", null), jsc);
+      deltaStreamer.sync();
+      assertRecordCount(6, tableBasePath, sqlContext);
+      deltaStreamer.shutdownGracefully();
+      testNum++;
+    }
+
+    @Test
+    public void testJsonKinesisShardSplitCheckpoint() throws Exception {
+      String streamName = "test-kinesis-stream-split-" + testNum;
+      kinesisTestUtils.createStream(streamName, 2);
+      prepareJsonKinesisDFSFiles(10, false, streamName);
+      prepareJsonKinesisDFSSource(PROPS_FILENAME_TEST_JSON_KINESIS, 
streamName);
+      String tableBasePath = basePath + "/test_json_kinesis_split_table" + 
testNum;
+      HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+          TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
LocalStackJsonKinesisSource.class.getName(),
+              Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KINESIS, false,
+              true, 100000, false, null, null, "timestamp", null), jsc);
+      deltaStreamer.sync();
+      assertRecordCount(10, tableBasePath, sqlContext);
+      String checkpointAfterBatch1 = 
getCheckpointFromLatestCommit(tableBasePath);
+      assertNotNull(checkpointAfterBatch1);
+      assertTrue(checkpointAfterBatch1.startsWith(streamName + ","));
+      assertTrue(checkpointAfterBatch1.contains(":"), "Checkpoint should have 
shard:seq format");
+
+      kinesisTestUtils.updateShardCount(streamName, 4);
+      prepareJsonKinesisDFSFiles(5, false, streamName);
+      deltaStreamer.sync();
+      assertRecordCount(15, tableBasePath, sqlContext);
+      String checkpointAfterSplit = 
getCheckpointFromLatestCommit(tableBasePath);
+      assertNotNull(checkpointAfterSplit);
+      assertTrue(checkpointAfterSplit.startsWith(streamName + ","));
+      // Closed parent shards must have lastSeq|endSeq with endSeq <= lastSeq 
so we can detect
+      // "fully consumed" when parent expires. LocalStack returns 
Long.MAX_VALUE; we replace with lastSeq.
+      assertFalse(checkpointAfterSplit.contains("9223372036854775807"),
+          "Checkpoint should not contain Long.MAX_VALUE as endSeq (parent 
expiry would fail)");
+      // Verify closed shard checkpoint format: lastSeq|endSeq with lastSeq >= 
endSeq (fully consumed)
+      Map<String, String> offsets = 
KinesisOffsetGen.CheckpointUtils.strToOffsets(checkpointAfterSplit);
+      for (Map.Entry<String, String> e : offsets.entrySet()) {
+        Pair<Option<String>, Option<String>> seqs = 
KinesisOffsetGen.CheckpointUtils.parseCheckpointValue(e.getValue());
+        String lastSeq = seqs.getLeft().orElse(null);
+        String endSeq = seqs.getRight().orElse(null);
+        if (endSeq != null && lastSeq != null && !lastSeq.isEmpty()) {
+          assertTrue(lastSeq.compareTo(endSeq) >= 0,
+              "Closed shard " + e.getKey() + ": lastSeq should be >= endSeq 
(fully consumed)");

Review Comment:
   _⚠️ Potential issue_ | _🟑 Minor_
   
   **Compare sequence numbers numerically, not lexicographically.**
   
   Lines 3295 and 3383 use `String.compareTo`, which is lexicographic. If 
Kinesis emits sequence numbers with different widths, the assertion can treat 
`"9"` as greater than `"10"` and misclassify a closed shard as fully consumed.
   
   
   
   Also applies to: 3377-3384
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java`
   around lines 3289 - 3296, The assertion currently compares Kinesis sequence
   numbers as strings using String.compareTo which is lexicographic and 
incorrect
   for numeric sequence IDs; change the comparison to numeric by parsing the
   sequence values (from KinesisOffsetGen.CheckpointUtils.strToOffsets /
   parseCheckpointValue output: lastSeq and endSeq) into a numeric type (use
   BigInteger to be safe for very large sequence IDs, or Long if you confirm
   bounds) and then compare numerically (e.g., bigLast.compareTo(bigEnd) >= 0).
   Apply the same numeric-parse-and-compare fix to both occurrences that use
   String.compareTo to ensure closed shards are validated correctly.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:2d6277d7-75f4-4e28-b60a-1efa18b71895 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036410529)) 
(source:comment#3036410529)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.KinesisSourceConfig;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen;
+import org.apache.hudi.utilities.sources.helpers.KinesisReadConfig;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
+import static 
org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.calculateNumEvents;
+
+/**
+ * Source to read JSON data from AWS Kinesis Data Streams using Spark.
+ */
+@Slf4j
+public class JsonKinesisSource extends KinesisSource<JavaRDD<String>> {
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /** Metadata-only summary for checkpoint; avoids bringing records to driver. 
*/
+  @AllArgsConstructor
+  @Getter
+  private static class ShardFetchSummary implements Serializable {
+    private final String shardId;
+    private final Option<String> lastSequenceNumber;
+    // UTC in milliseconds.
+    private final Option<Long> lastArrivalTime;
+    private final int recordCount;
+    private final boolean reachedEndOfShard;
+  }
+
+  /**
+   * Per-shard fetch result stored in the persisted RDD.
+   * Records are eagerly materialized as List&lt;String&gt; so that both 
fields survive RDD spill
+   * to disk: List and ShardFetchSummary are fully serializable, unlike a 
transient Iterable.
+   */
+  @AllArgsConstructor
+  @Getter
+  private static class ShardFetchResult implements Serializable {
+    private final List<String> records;
+    private final ShardFetchSummary summary;
+  }
+
+  /** Persisted fetch RDD - must be unpersisted in releaseResources to avoid 
memory leak. */
+  private transient org.apache.spark.api.java.JavaRDD<ShardFetchResult> 
persistedFetchRdd;
+  /** Record count from fetch, avoids redundant batch.count() Spark job. */
+  private long lastRecordCount;
+  /** Shard IDs where the executor observed nextShardIterator==null 
(end-of-shard reached). */
+  protected Set<String> shardsReachedEnd;
+  /** Arrival time (epoch millis) of the record with last sequence number, per 
shard. */
+  protected Map<String, Long> lastArrivalTimes;
+
+  public JsonKinesisSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession,
+                           SchemaProvider schemaProvider, 
HoodieIngestionMetrics metrics) {
+    this(properties, sparkContext, sparkSession, metrics,
+        new DefaultStreamContext(schemaProvider, Option.empty()));
+  }
+
+  public JsonKinesisSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession,
+                           HoodieIngestionMetrics metrics, StreamContext 
streamContext) {
+    super(properties, sparkContext, sparkSession, SourceType.JSON, metrics,
+        new DefaultStreamContext(streamContext.getSchemaProvider(), 
streamContext.getSourceProfileSupplier()));
+    this.offsetGen = new KinesisOffsetGen(props);
+  }
+
+  @Override
+  protected JavaRDD<String> toBatch(KinesisOffsetGen.KinesisShardRange[] 
shardRanges, long sourceLimit) {
+    long numEvents = calculateNumEvents(sourceLimit, props);
+    KinesisReadConfig readConfig = new KinesisReadConfig(
+        offsetGen.getStreamName(),
+        offsetGen.getRegion(),
+        offsetGen.getEndpointUrl().orElse(null),
+        getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_ACCESS_KEY, 
null),
+        getStringWithAltKeys(props, KinesisSourceConfig.KINESIS_SECRET_KEY, 
null),
+        offsetGen.getStartingPositionStrategy(),
+        shouldAddMetaFields,
+        getBooleanWithAltKeys(props, 
KinesisSourceConfig.KINESIS_ENABLE_DEAGGREGATION),
+        getIntWithAltKeys(props, 
KinesisSourceConfig.KINESIS_MAX_RECORDS_PER_REQUEST),
+        getLongWithAltKeys(props, 
KinesisSourceConfig.KINESIS_GET_RECORDS_INTERVAL_MS),
+        // NOTE that: Evenly set the max events per shard.
+        shardRanges.length > 0 ? Math.max(1, numEvents / shardRanges.length) : 
numEvents,
+        getLongWithAltKeys(props, 
KinesisSourceConfig.KINESIS_RETRY_INITIAL_INTERVAL_MS),
+        getLongWithAltKeys(props, 
KinesisSourceConfig.KINESIS_RETRY_MAX_INTERVAL_MS),
+        getLongWithAltKeys(props, 
KinesisSourceConfig.KINESIS_THROTTLE_TIMEOUT_MS));
+
+    JavaRDD<ShardFetchResult> fetchRdd = sparkContext.parallelize(
+        java.util.Arrays.asList(shardRanges), shardRanges.length)
+        .mapPartitions(shardRangeIt -> {
+          List<ShardFetchResult> results = new ArrayList<>();
+          try (KinesisClient client = KinesisOffsetGen.createKinesisClient(
+              readConfig.getRegion(), readConfig.getEndpointUrl(),
+              readConfig.getAccessKey(), readConfig.getSecretKey())) {
+            while (shardRangeIt.hasNext()) {
+              KinesisOffsetGen.KinesisShardRange range = shardRangeIt.next();
+              // Lazy iterator: fetches one GetRecords page at a time, keeping 
only one page in
+              // executor memory instead of the full shard batch. Records are 
GC-eligible as soon
+              // as they are converted to JSON strings below.
+              KinesisSource.ShardRecordIterator recordIt = 
KinesisSource.readShardRecords(
+                  client, readConfig.getStreamName(), range, 
readConfig.getStartingPosition(),
+                  readConfig.getMaxRecordsPerRequest(), 
readConfig.getIntervalMilliSeconds(),
+                  readConfig.getMaxRecordsPerShard(), 
readConfig.isEnableDeaggregation(),
+                  readConfig.getRetryInitialIntervalMs(), 
readConfig.getRetryMaxIntervalMs(),
+                  readConfig.getThrottleTimeoutMs());
+
+              String shardId = range.getShardId();
+              boolean addMetaFields = readConfig.isShouldAddMetaFields();
+              List<String> jsonRecords = new ArrayList<>();
+              long numNull = 0;
+              java.time.Instant lastArrivalTimestamp = null;
+              while (recordIt.hasNext()) {
+                Record r = recordIt.next();
+                lastArrivalTimestamp = r.approximateArrivalTimestamp();
+                String s = recordToJsonStatic(r, shardId, addMetaFields);
+                if (s != null) {
+                  jsonRecords.add(s);
+                } else {
+                  numNull++;
+                }
+              }
+              if (numNull > 0) {
+                log.warn("There are {} null strings for shard id {}", numNull, 
shardId);
+              }
+              // Capture the arrival time of the last record (same record 
whose sequence number
+              // becomes the checkpoint lastSeq) so it can be embedded in the 
checkpoint.
+              Option<Long> lastArrivalTime = lastArrivalTimestamp != null
+                  ? Option.of(lastArrivalTimestamp.toEpochMilli()) : 
Option.empty();
+              // recordCount reflects actual output records (null-filtered), 
not raw Kinesis count.
+              // NOTE: getLastSequenceNumber/isReachedEndOfShard are final 
only after hasNext()==false.
+              ShardFetchSummary summary = new ShardFetchSummary(shardId,
+                  recordIt.getLastSequenceNumber(), lastArrivalTime,
+                  jsonRecords.size(), recordIt.isReachedEndOfShard());
+              results.add(new ShardFetchResult(jsonRecords, summary));
+            }
+          }
+          return results.iterator();
+        });
+
+    if (persistedFetchRdd != null) {
+      persistedFetchRdd.unpersist();
+      persistedFetchRdd = null;
+    }
+    boolean persistFetchRdd = getBooleanWithAltKeys(props, 
KinesisSourceConfig.KINESIS_PERSIST_FETCH_RDD);
+    if (persistFetchRdd) {
+      
fetchRdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK());
+      persistedFetchRdd = fetchRdd;
+    } else {
+      log.debug("{} is false: fetch RDD is not persisted. The same Kinesis 
fetch may run twice (checkpoint + "
+          + "record write), which can cause duplicate records to be written. 
Set to true for correct behavior.",
+          KinesisSourceConfig.KINESIS_PERSIST_FETCH_RDD.key());
+    }
+    // Guard: if anything below throws, unpersist immediately so the cached 
RDD doesn't leak.
+    boolean succeeded = false;
+    try {
+      // Collect basic information that will be used to construct the final 
checkpoint.
+      collectCheckpointInfo(fetchRdd);
+      // RDD for the shard data.
+      JavaRDD<String> recordRdd = fetchRdd.flatMap(r -> 
r.getRecords().iterator());
+      // Apply minimum partitions for downstream parallelism (similar to Kafka 
source)
+      long manualPartitions = getLongWithAltKeys(props, 
KinesisSourceConfig.KINESIS_SOURCE_MANUAL_PARTITIONS);
+      if (manualPartitions > 0) {
+        int targetPartitions = (int) manualPartitions;
+        log.info("Repartitioning from {} shards to {} partitions 
(manualPartitions={})",
+            shardRanges.length, targetPartitions, manualPartitions);
+        recordRdd = recordRdd.repartition(targetPartitions);
+      }
+      succeeded = true;
+      return recordRdd;
+    } finally {
+      if (!succeeded) {
+        releaseResources();
+      }
+    }
+  }
+
+  private static String recordToJsonStatic(Record record, String shardId, 
boolean shouldAddMetaFields) {
+    String dataStr = record.data().asUtf8String();
+    // Pure empty or null records in Kinesis is not meaningful.
+    if (dataStr == null || dataStr.trim().isEmpty()) {
+      return null;
+    }
+    if (shouldAddMetaFields) {
+      try {
+        ObjectNode node = (ObjectNode) OBJECT_MAPPER.readTree(dataStr);
+        node.put("_hoodie_kinesis_source_sequence_number", 
record.sequenceNumber());
+        node.put("_hoodie_kinesis_source_shard_id", shardId);
+        node.put("_hoodie_kinesis_source_partition_key", 
record.partitionKey());
+        if (record.approximateArrivalTimestamp() != null) {
+          node.put("_hoodie_kinesis_source_timestamp",
+              record.approximateArrivalTimestamp().toEpochMilli());
+        }
+        return OBJECT_MAPPER.writeValueAsString(node);
+      } catch (Exception e) {
+        // We can disable the flag for mitigation.
+        throw new HoodieException("Failed to add metadata fields", e);
+      }
+    }
+    return dataStr;
+  }

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   <details>
   <summary>🧩 Analysis chain</summary>
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Check if there's documentation or other handling for JSON array payloads 
in Kinesis sources
   rg -n -C3 "ObjectNode|JsonNode" --type=java 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/
   ```
   
   Repository: yihua/hudi
   
   Length of output: 10606
   
   ---
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Search for JsonKinesisSource tests
   find . -name "*JsonKinesisSource*Test*.java" -o -name 
"*Test*JsonKinesisSource*.java"
   ```
   
   Repository: yihua/hudi
   
   Length of output: 145
   
   ---
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Check if there are any type checks or validations in JsonKinesisSource
   rg -n "isObject|isArray|getNodeType" 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 36
   
   ---
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Look for documentation or comments about JSON payload types
   rg -n -B2 -A2 "JSON|array|object|payload" 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
 | head -50
   ```
   
   Repository: yihua/hudi
   
   Length of output: 1084
   
   ---
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Check JsonKafkaSource to see if it has the same limitation or handles it 
differently
   sed -n '100,130p' 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 1629
   
   ---
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Examine the test file to see if arrays are tested
   cat 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKinesisSource.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 21272
   
   ---
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Get the full catch block context from JsonKinesisSource
   sed -n '226,243p' 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 834
   
   ---
   
   🏁 Script executed:
   
   ```shell
   #!/bin/bash
   # Get the full catch block context from JsonKafkaSource for comparison 
(around line 116)
   sed -n '110,135p' 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
   ```
   
   Repository: yihua/hudi
   
   Length of output: 1172
   
   ---
   
   
   
   </details>
   
   **JSON array payloads will throw when metadata fields are enabled.**
   
   When `shouldAddMetaFields` is `true`, the code attempts an unchecked cast to 
`ObjectNode` (line 228). If the Kinesis record contains valid JSON that is not 
an objectβ€”such as a JSON array `[1,2,3]`β€”this will throw a `ClassCastException` 
wrapped in a `HoodieException`.
   
   Compare this to `JsonKafkaSource`, which handles the same pattern 
differently: it catches exceptions during this operation and gracefully returns 
the original record. The test file `TestJsonKinesisSource.java` also suggests 
this behavior may be unintended, with test 
`testRecordToJsonInvalidJsonWithShouldAddOffsetsReturnsOriginalString()` 
expecting the original string to be returned rather than an exception thrown.
   
   Consider checking the node type before casting (e.g., `node.isObject()`) to 
align with either explicit documentation of supported types or with the 
error-handling approach used in `JsonKafkaSource`.
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java`
   around lines 226 - 243, The code inside the shouldAddMetaFields block blindly
   casts OBJECT_MAPPER.readTree(dataStr) to ObjectNode which throws for 
non-object
   JSON (e.g., arrays); change the logic in JsonKinesisSource so after reading 
the
   JsonNode you check node.isObject() before casting and only add the meta 
fields
   (using node.put(...) and OBJECT_MAPPER.writeValueAsString(node)) when it is 
an
   object; if it is not an object (or if readTree fails), mirror JsonKafkaSource
   behavior by catching exceptions or returning the original dataStr instead of
   throwing a HoodieException, keeping references to shouldAddMetaFields,
   OBJECT_MAPPER.readTree, ObjectNode, and the record.* methods (sequenceNumber,
   partitionKey, approximateArrivalTimestamp) for locating the code to modify.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:ocelot:905b5bed-b2bc-4566-aabc-7fe1d943d3ff -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036410523)) 
(source:comment#3036410523)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.KinesisSourceConfig;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.KinesisDeaggregator;
+import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen;
+import org.apache.hudi.utilities.streamer.StreamContext;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
+import 
software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+
+@Slf4j
+public abstract class KinesisSource<T> extends Source<T> {
+
+  protected static final String METRIC_NAME_KINESIS_MESSAGE_IN_COUNT = 
"kinesisMessageInCount";
+
+  protected final HoodieIngestionMetrics metrics;
+  protected final SchemaProvider schemaProvider;
+  protected KinesisOffsetGen offsetGen;
+  protected final boolean shouldAddMetaFields;
+  /** Checkpoint data (shardId -> sequenceNumber) collected during toBatch 
execution. Set by subclasses. */
+  protected Map<String, String> lastCheckpointData;
+
+  protected KinesisSource(TypedProperties props, JavaSparkContext 
sparkContext, SparkSession sparkSession,
+                          SourceType sourceType, HoodieIngestionMetrics 
metrics, StreamContext streamContext) {
+    super(props, sparkContext, sparkSession, sourceType, streamContext);
+    this.schemaProvider = streamContext.getSchemaProvider();
+    this.metrics = metrics;
+    this.shouldAddMetaFields = getBooleanWithAltKeys(props, 
KinesisSourceConfig.KINESIS_APPEND_OFFSETS);
+  }
+
+  @Override
+  protected final InputBatch<T> fetchNewData(Option<String> lastCkptStr, long 
sourceLimit) {
+    throw new UnsupportedOperationException("KinesisSource#fetchNewData should 
not be called");
+  }
+
+  @Override
+  protected InputBatch<T> readFromCheckpoint(Option<Checkpoint> 
lastCheckpoint, long sourceLimit) {
+    // STEP 1: Collect all available shards for the stream: open/closed shards.
+    KinesisOffsetGen.KinesisShardRange[] allOpenClosedShardRanges = 
offsetGen.getNextShardRanges(lastCheckpoint, sourceLimit);
+    // STEP 2: Filter out shards with no unread records to avoid unnecessary 
GetRecords calls.
+    boolean useLatestStartingPositionStrategy =
+        offsetGen.getStartingPositionStrategy() == 
KinesisSourceConfig.KinesisStartingPositionStrategy.LATEST;
+    int numShardsBeforeFilter = allOpenClosedShardRanges.length;
+    KinesisOffsetGen.KinesisShardRange[] shardRangesWithUnreadRecords = 
Arrays.stream(allOpenClosedShardRanges)
+        .filter(range -> 
range.hasUnreadRecords(useLatestStartingPositionStrategy))
+        .toArray(KinesisOffsetGen.KinesisShardRange[]::new);
+    if (numShardsBeforeFilter > shardRangesWithUnreadRecords.length) {
+      log.info("Filtered {} shards with no unread records, {} shards remain",
+          numShardsBeforeFilter - shardRangesWithUnreadRecords.length, 
shardRangesWithUnreadRecords.length);
+    }
+    // When nothing to read, return empty batch and previous checkpoint if any.
+    if (shardRangesWithUnreadRecords.length == 0) {
+      
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
 0);
+      String checkpointStr = lastCheckpoint.isPresent() ? 
lastCheckpoint.get().getCheckpointKey() : "";
+      return new InputBatch<>(Option.empty(), checkpointStr);
+    }
+    // STEP 3: Otherwise, do the read.
+    T batch = toBatch(shardRangesWithUnreadRecords, sourceLimit);
+    // STEP 4: Generate checkpoint.
+    // Pass allOpenClosedShardRanges so filtered-out shards are preserved in 
the checkpoint; otherwise
+    // next run would re-read them from TRIM_HORIZON and cause duplicates
+    String checkpointStr = createCheckpointFromBatch(batch, 
shardRangesWithUnreadRecords, allOpenClosedShardRanges);
+    // STEP 5: Emit metrics.
+    long totalMsgs = getRecordCount(batch);
+    
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
 totalMsgs);
+    log.info("Read {} records from Kinesis stream {} with {} shards, 
checkpoint: {}",
+        totalMsgs, offsetGen.getStreamName(), 
shardRangesWithUnreadRecords.length, checkpointStr);
+
+    return new InputBatch<>(Option.of(batch), checkpointStr);
+  }
+
+  /** Upper bound on consecutive empty GetRecords responses before giving up 
on a shard. */
+  private static final int MAX_EMPTY_RESPONSES_FROM_GET_RECORDS = 100;
+
+  /**
+   * Lazy iterator over records from a single Kinesis shard.
+   *
+   * <p>Records are fetched one GetRecords page at a time; the next page is 
only requested once all
+   * records from the current page have been consumed. This avoids holding the 
full shard batch in
+   * executor memory simultaneously with the caller's output collection.
+   *
+   * <p>After {@link #hasNext()} returns {@code false} callers must read
+   * {@link #getLastSequenceNumber()} and {@link #isReachedEndOfShard()} to 
obtain checkpoint state.
+   *
+   * <p><b>lastSequenceNumber correctness invariant:</b> the sequence number 
is taken from the last
+   * <em>raw</em> Kinesis record (pre-deaggregation) of a page and is only 
committed once all
+   * deaggregated records from that page have been yielded. This guarantees 
the checkpoint never
+   * advances past records that have not yet been returned to the caller.
+   */
+  public static class ShardRecordIterator implements Iterator<Record> {
+    private final KinesisClient client;
+    private final String shardId;
+    private final int maxRecordsPerRequest;
+    private final long requestIntervalMs;
+    private final long maxTotalRecords;
+    private final boolean enableDeaggregation;
+    private final long retryInitialIntervalMs;
+    private final long retryMaxIntervalMs;
+    private final long throttleTimeoutMs;
+
+    /** Current position in the Kinesis shard; null means the shard is 
exhausted. */
+    private String shardIteratorStr;
+    /** Records from the most recently fetched page, ready to be yielded. */
+    private Iterator<Record> currentPage = Collections.emptyIterator();
+    /**
+     * Raw lastSeq of the page currently being consumed. Moved to {@link 
#lastSequenceNumber} only
+     * when the page iterator is fully exhausted, ensuring the checkpoint 
never skips records.
+     */
+    private String pendingPageLastSeq = null;
+    /** Checkpoint-safe lastSeq: reflects only fully-consumed pages. */
+    private String lastSequenceNumber = null;
+    private boolean reachedEndOfShard = false;
+    /** True once no further GetRecords calls should be made. */
+    private boolean fetchingDone = false;
+    private long totalConsumed = 0;
+    private int emptyPageCount = 0;
+
+    /**
+     * Dynamically tuned records-per-request limit.
+     * Halved on each ProvisionedThroughputExceededException and held there 
for the rest of the shard read.
+     */
+    private int currentMaxRecords;
+    /** Epoch ms of the last successful GetRecords call; used to enforce 
{@link #throttleTimeoutMs}. */
+    private long lastSuccessTimeMs;
+
+    public ShardRecordIterator(String initialShardIterator, KinesisClient 
client, String shardId,
+                               int maxRecordsPerRequest, long 
requestIntervalMs, long maxTotalRecords, boolean enableDeaggregation,
+                               long retryInitialIntervalMs, long 
retryMaxIntervalMs, long throttleTimeoutMs) {
+      this.shardIteratorStr = initialShardIterator;
+      this.client = client;
+      this.shardId = shardId;
+      this.maxRecordsPerRequest = maxRecordsPerRequest;
+      this.requestIntervalMs = requestIntervalMs;
+      this.maxTotalRecords = maxTotalRecords;
+      this.enableDeaggregation = enableDeaggregation;
+      this.retryInitialIntervalMs = retryInitialIntervalMs;
+      this.retryMaxIntervalMs = retryMaxIntervalMs;
+      this.throttleTimeoutMs = throttleTimeoutMs;
+      this.currentMaxRecords = maxRecordsPerRequest;
+      this.lastSuccessTimeMs = System.currentTimeMillis();
+    }
+
+    @Override
+    public boolean hasNext() {
+      while (true) {
+        if (currentPage.hasNext()) {
+          return true;
+        }
+        // Current page fully consumed: commit its lastSeq before moving on.
+        commitPendingPageLastSeq();
+        if (fetchingDone) {
+          return false;
+        }
+        fetchNextPage();
+        // Loop: if the page was empty, try fetching again (up to 
MAX_EMPTY_RESPONSES limit).
+      }
+    }
+
+    @Override
+    public Record next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more records for shard " + 
shardId);
+      }
+      totalConsumed++;
+      return currentPage.next();
+    }
+
+    public Option<String> getLastSequenceNumber() {
+      return Option.ofNullable(lastSequenceNumber);
+    }
+
+    public boolean isReachedEndOfShard() {
+      return reachedEndOfShard;
+    }
+
+    private void commitPendingPageLastSeq() {
+      if (pendingPageLastSeq != null) {
+        lastSequenceNumber = pendingPageLastSeq;
+        pendingPageLastSeq = null;
+      }
+    }
+
+    private void fetchNextPage() {
+      if (shardIteratorStr == null || totalConsumed >= maxTotalRecords) {
+        fetchingDone = true;
+        return;
+      }
+      GetRecordsResponse response;
+      int attempt = 0;
+      while (true) {
+        try {
+          response = client.getRecords(
+              GetRecordsRequest.builder()
+                  .shardIterator(shardIteratorStr)
+                  .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords - 
totalConsumed)))
+                  .build());

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   **Compute the per-request limit in `long` before narrowing it.**
   
   Line 240 casts `maxTotalRecords - totalConsumed` to `int` before taking the 
min. If the remaining quota is larger than `Integer.MAX_VALUE`, that wraps 
negative and sends an invalid `limit` to `GetRecords`, so large-but-valid 
source limits fail at runtime.
   
   <details>
   <summary>Suggested fix</summary>
   
   ```diff
              response = client.getRecords(
                  GetRecordsRequest.builder()
                      .shardIterator(shardIteratorStr)
   -                  .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords 
- totalConsumed)))
   +                  .limit((int) Math.min((long) currentMaxRecords, 
maxTotalRecords - totalConsumed))
                      .build());
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>πŸ“ Committable suggestion</summary>
   
   > ‼️ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
             response = client.getRecords(
                 GetRecordsRequest.builder()
                     .shardIterator(shardIteratorStr)
                     .limit((int) Math.min((long) currentMaxRecords, 
maxTotalRecords - totalConsumed))
                     .build());
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>πŸ€– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java`
   around lines 237 - 241, In KinesisSource, compute the remaining per-request
   limit as a long before narrowing to int to avoid overflow: calculate 
remaining =
   maxTotalRecords - totalConsumed (as long), then compute perRequestLimitLong =
   Math.min(currentMaxRecords, remaining) and finally cast perRequestLimitLong 
to
   int only when building GetRecordsRequest.limit; update the
   GetRecordsRequest.builder() call (the code using shardIteratorStr,
   currentMaxRecords, maxTotalRecords, totalConsumed) to use this int-cast of 
the
   safe long-min result so you never pass a negative/overflowed int to
   GetRecordsRequest.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:2d6277d7-75f4-4e28-b60a-1efa18b71895 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   β€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/19#discussion_r3036410526)) 
(source:comment#3036410526)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to