yihua commented on code in PR #18224:
URL: https://github.com/apache/hudi/pull/18224#discussion_r3036400342


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
+
+/**
+ * Kinesis Source Configs for Hudi Streamer.
+ */
+@Immutable
+@ConfigClassProperty(name = "Kinesis Source Configs",
+    groupName = ConfigGroups.Names.HUDI_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
+    description = "Configurations controlling the behavior of Kinesis source 
in Hudi Streamer.")
+public class KinesisSourceConfig extends HoodieConfig {
+
+  private static final String PREFIX = STREAMER_CONFIG_PREFIX + 
"source.kinesis.";
+
+  public static final ConfigProperty<String> KINESIS_STREAM_NAME = 
ConfigProperty
+      .key(PREFIX + "stream.name")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .withDocumentation("Kinesis Data Streams stream name.");
+
+  public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty
+      .key(PREFIX + "region")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS region for the Kinesis stream (e.g., 
us-east-1).");
+
+  public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = 
ConfigProperty
+      .key(PREFIX + "endpoint.url")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Custom endpoint URL for Kinesis (e.g., for 
localstack). "
+          + "If not set, uses the default AWS endpoint for the region.");
+
+  public static final ConfigProperty<String> KINESIS_ACCESS_KEY = 
ConfigProperty
+      .key(PREFIX + "access.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS access key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<String> KINESIS_SECRET_KEY = 
ConfigProperty
+      .key(PREFIX + "secret.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS secret key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = 
ConfigProperty
+      .key(PREFIX + "max.events")
+      .defaultValue(5000000L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records obtained in each batch 
from Kinesis.");
+
+  public static final ConfigProperty<Long> KINESIS_SOURCE_MANUAL_PARTITIONS = 
ConfigProperty
+      .key(PREFIX + "partitions")
+      .defaultValue(0L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Desired number of Spark partitions when reading from 
Kinesis. "
+          + "By default, Hudi has a 1-1 mapping of Kinesis shards to Spark 
partitions. "
+          + "If set to a value greater than 0, the result RDD will be 
repartitioned "
+          + "to increase/decrease downstream parallelism. Use 0 for 1-1 
mapping.");
+
+  public static final ConfigProperty<Boolean> KINESIS_APPEND_OFFSETS = 
ConfigProperty
+      .key(PREFIX + "append.offsets")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, appends Kinesis metadata (sequence 
number, shard id, arrival timestamp, partition key) to records.");
+
+  public static final ConfigProperty<Boolean> KINESIS_ENABLE_DEAGGREGATION = 
ConfigProperty
+      .key(PREFIX + "enable.deaggregation")
+      .defaultValue(true)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, de-aggregates records produced by 
Kinesis Producer Library (KPL). "
+          + "Non-aggregated records pass through unchanged. Set to false if 
producers do not use KPL.");
+
+  public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = 
ConfigProperty
+      .key(PREFIX + "enable.fail.on.data.loss")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Fail when checkpoint references an expired shard 
which has not been fully consumed.");
+
+  public static final ConfigProperty<String> KINESIS_STARTING_POSITION = 
ConfigProperty
+      .key(PREFIX + "starting.position")
+      .defaultValue("LATEST")
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Starting position when no checkpoint exists. 
EARLIEST or LATEST. Default: LATEST.");
+
+  public static final ConfigProperty<Integer> KINESIS_MAX_RECORDS_PER_REQUEST 
= ConfigProperty
+      .key(PREFIX + "max.records.per.request")
+      .defaultValue(10000)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records to fetch per GetRecords 
API call. Kinesis limit is 10000.");
+
+  public static final ConfigProperty<Long> KINESIS_GET_RECORDS_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "get_records.interval.ms")
+      .defaultValue(200L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Minimum interval in ms between two GetRecords API 
calls per shard.");
+
+  public static final ConfigProperty<Long> KINESIS_RETRY_INITIAL_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "retry.initial_interval_ms")
+      .defaultValue(1000L)
+      .sinceVersion("1.2.0")

Review Comment:
   🤖 nit: config key uses underscores in `initial_interval_ms`; use dots 
instead (e.g., `retry.initial.interval.ms`) to match the Hudi convention 
throughout this file.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
+
+/**
+ * Kinesis Source Configs for Hudi Streamer.
+ */
+@Immutable
+@ConfigClassProperty(name = "Kinesis Source Configs",
+    groupName = ConfigGroups.Names.HUDI_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
+    description = "Configurations controlling the behavior of Kinesis source 
in Hudi Streamer.")
+public class KinesisSourceConfig extends HoodieConfig {
+
+  private static final String PREFIX = STREAMER_CONFIG_PREFIX + 
"source.kinesis.";
+
+  public static final ConfigProperty<String> KINESIS_STREAM_NAME = 
ConfigProperty
+      .key(PREFIX + "stream.name")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .withDocumentation("Kinesis Data Streams stream name.");
+
+  public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty
+      .key(PREFIX + "region")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS region for the Kinesis stream (e.g., 
us-east-1).");
+
+  public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = 
ConfigProperty
+      .key(PREFIX + "endpoint.url")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Custom endpoint URL for Kinesis (e.g., for 
localstack). "
+          + "If not set, uses the default AWS endpoint for the region.");
+
+  public static final ConfigProperty<String> KINESIS_ACCESS_KEY = 
ConfigProperty
+      .key(PREFIX + "access.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS access key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<String> KINESIS_SECRET_KEY = 
ConfigProperty
+      .key(PREFIX + "secret.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS secret key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = 
ConfigProperty
+      .key(PREFIX + "max.events")
+      .defaultValue(5000000L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records obtained in each batch 
from Kinesis.");
+
+  public static final ConfigProperty<Long> KINESIS_SOURCE_MANUAL_PARTITIONS = 
ConfigProperty
+      .key(PREFIX + "partitions")
+      .defaultValue(0L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Desired number of Spark partitions when reading from 
Kinesis. "
+          + "By default, Hudi has a 1-1 mapping of Kinesis shards to Spark 
partitions. "
+          + "If set to a value greater than 0, the result RDD will be 
repartitioned "
+          + "to increase/decrease downstream parallelism. Use 0 for 1-1 
mapping.");
+
+  public static final ConfigProperty<Boolean> KINESIS_APPEND_OFFSETS = 
ConfigProperty
+      .key(PREFIX + "append.offsets")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, appends Kinesis metadata (sequence 
number, shard id, arrival timestamp, partition key) to records.");
+
+  public static final ConfigProperty<Boolean> KINESIS_ENABLE_DEAGGREGATION = 
ConfigProperty
+      .key(PREFIX + "enable.deaggregation")
+      .defaultValue(true)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, de-aggregates records produced by 
Kinesis Producer Library (KPL). "
+          + "Non-aggregated records pass through unchanged. Set to false if 
producers do not use KPL.");
+
+  public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = 
ConfigProperty
+      .key(PREFIX + "enable.fail.on.data.loss")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Fail when checkpoint references an expired shard 
which has not been fully consumed.");
+
+  public static final ConfigProperty<String> KINESIS_STARTING_POSITION = 
ConfigProperty
+      .key(PREFIX + "starting.position")
+      .defaultValue("LATEST")
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Starting position when no checkpoint exists. 
EARLIEST or LATEST. Default: LATEST.");
+
+  public static final ConfigProperty<Integer> KINESIS_MAX_RECORDS_PER_REQUEST 
= ConfigProperty
+      .key(PREFIX + "max.records.per.request")
+      .defaultValue(10000)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records to fetch per GetRecords 
API call. Kinesis limit is 10000.");
+

Review Comment:
   🤖 nit: config key uses underscore in `get_records.interval.ms`; the 
convention throughout KinesisSourceConfig and Hudi is dots (e.g., 
`stream.name`, `enable.deaggregation`). consider `get.records.interval.ms` for 
consistency.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
+
+/**
+ * Kinesis Source Configs for Hudi Streamer.
+ */
+@Immutable
+@ConfigClassProperty(name = "Kinesis Source Configs",
+    groupName = ConfigGroups.Names.HUDI_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
+    description = "Configurations controlling the behavior of Kinesis source 
in Hudi Streamer.")
+public class KinesisSourceConfig extends HoodieConfig {
+
+  private static final String PREFIX = STREAMER_CONFIG_PREFIX + 
"source.kinesis.";
+
+  public static final ConfigProperty<String> KINESIS_STREAM_NAME = 
ConfigProperty
+      .key(PREFIX + "stream.name")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .withDocumentation("Kinesis Data Streams stream name.");
+
+  public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty
+      .key(PREFIX + "region")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS region for the Kinesis stream (e.g., 
us-east-1).");
+
+  public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = 
ConfigProperty
+      .key(PREFIX + "endpoint.url")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Custom endpoint URL for Kinesis (e.g., for 
localstack). "
+          + "If not set, uses the default AWS endpoint for the region.");
+
+  public static final ConfigProperty<String> KINESIS_ACCESS_KEY = 
ConfigProperty
+      .key(PREFIX + "access.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS access key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<String> KINESIS_SECRET_KEY = 
ConfigProperty
+      .key(PREFIX + "secret.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS secret key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = 
ConfigProperty
+      .key(PREFIX + "max.events")
+      .defaultValue(5000000L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records obtained in each batch 
from Kinesis.");
+
+  public static final ConfigProperty<Long> KINESIS_SOURCE_MANUAL_PARTITIONS = 
ConfigProperty
+      .key(PREFIX + "partitions")
+      .defaultValue(0L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Desired number of Spark partitions when reading from 
Kinesis. "
+          + "By default, Hudi has a 1-1 mapping of Kinesis shards to Spark 
partitions. "
+          + "If set to a value greater than 0, the result RDD will be 
repartitioned "
+          + "to increase/decrease downstream parallelism. Use 0 for 1-1 
mapping.");
+
+  public static final ConfigProperty<Boolean> KINESIS_APPEND_OFFSETS = 
ConfigProperty
+      .key(PREFIX + "append.offsets")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, appends Kinesis metadata (sequence 
number, shard id, arrival timestamp, partition key) to records.");
+
+  public static final ConfigProperty<Boolean> KINESIS_ENABLE_DEAGGREGATION = 
ConfigProperty
+      .key(PREFIX + "enable.deaggregation")
+      .defaultValue(true)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, de-aggregates records produced by 
Kinesis Producer Library (KPL). "
+          + "Non-aggregated records pass through unchanged. Set to false if 
producers do not use KPL.");
+
+  public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = 
ConfigProperty
+      .key(PREFIX + "enable.fail.on.data.loss")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Fail when checkpoint references an expired shard 
which has not been fully consumed.");
+
+  public static final ConfigProperty<String> KINESIS_STARTING_POSITION = 
ConfigProperty
+      .key(PREFIX + "starting.position")
+      .defaultValue("LATEST")
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Starting position when no checkpoint exists. 
EARLIEST or LATEST. Default: LATEST.");
+
+  public static final ConfigProperty<Integer> KINESIS_MAX_RECORDS_PER_REQUEST 
= ConfigProperty
+      .key(PREFIX + "max.records.per.request")
+      .defaultValue(10000)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records to fetch per GetRecords 
API call. Kinesis limit is 10000.");
+
+  public static final ConfigProperty<Long> KINESIS_GET_RECORDS_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "get_records.interval.ms")
+      .defaultValue(200L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Minimum interval in ms between two GetRecords API 
calls per shard.");
+
+  public static final ConfigProperty<Long> KINESIS_RETRY_INITIAL_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "retry.initial_interval_ms")
+      .defaultValue(1000L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Initial backoff in ms when Kinesis returns 
ProvisionedThroughputExceededException. "
+          + "Backoff doubles each retry up to retry.max_interval_ms.");
+
+  public static final ConfigProperty<Long> KINESIS_RETRY_MAX_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "retry.max_interval_ms")
+      .defaultValue(10000L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum backoff in ms between retries for throughput 
exceeded.");
+
+  public static final ConfigProperty<Long> KINESIS_THROTTLE_TIMEOUT_MS = 
ConfigProperty
+      .key(PREFIX + "retry.throttle_timeout_ms")
+      .defaultValue(600000L)
+      .sinceVersion("1.2.0")

Review Comment:
   🤖 nit: config key uses underscores in `throttle_timeout_ms`; use dots 
instead (e.g., `retry.throttle.timeout.ms`) to match the Hudi convention 
throughout this file.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/KinesisTestUtils.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import com.amazonaws.kinesis.agg.AggRecord;
+import com.amazonaws.kinesis.agg.RecordAggregator;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.ScalingType;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest;
+
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.ByteBuffer;
+import java.net.URI;
+import java.util.List;
+
+/**
+ * Test utilities for Kinesis Data Streams using LocalStack.
+ */
+public class KinesisTestUtils {
+  private static final DockerImageName LOCALSTACK_IMAGE =
+      DockerImageName.parse("localstack/localstack:4.1.0");
+
+  private LocalStackContainer localStack;
+  private KinesisClient kinesisClient;
+
+  public KinesisTestUtils setup() {
+    localStack = new LocalStackContainer(LOCALSTACK_IMAGE)
+        .withServices(LocalStackContainer.Service.KINESIS);
+    localStack.start();
+    kinesisClient = KinesisClient.builder()
+        
.endpointOverride(localStack.getEndpointOverride(LocalStackContainer.Service.KINESIS))
+        .credentialsProvider(
+            StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(localStack.getAccessKey(), 
localStack.getSecretKey())))
+        .region(Region.of(localStack.getRegion()))
+        .build();
+    return this;
+  }
+
+  public String getEndpointUrl() {
+    if (localStack == null || !localStack.isRunning()) {
+      throw new IllegalStateException("LocalStack container is not running. 
Please start the container first.");
+    }
+    URI endpoint = 
localStack.getEndpointOverride(LocalStackContainer.Service.KINESIS);
+    return endpoint.toString();
+  }
+
+  public String getRegion() {
+    if (localStack == null || !localStack.isRunning()) {
+      throw new IllegalStateException("LocalStack container is not running. 
Please start the container first.");
+    }
+    return localStack.getRegion();
+  }
+
+  public String getAccessKey() {
+    if (localStack == null || !localStack.isRunning()) {
+      throw new IllegalStateException("LocalStack container is not running. 
Please start the container first.");
+    }
+    return localStack.getAccessKey();
+  }
+
+  public String getSecretKey() {
+    if (localStack == null || !localStack.isRunning()) {
+      throw new IllegalStateException("LocalStack container is not running. 
Please start the container first.");
+    }
+    return localStack.getSecretKey();
+  }
+
+  public void createStream(String streamName) {
+    createStream(streamName, 1);
+  }
+
+  public void createStream(String streamName, int shardCount) {
+    kinesisClient.createStream(
+        CreateStreamRequest.builder()
+            .streamName(streamName)
+            .shardCount(shardCount)
+            .build());
+    waitForStreamActive(streamName);
+  }
+
+  private void waitForStreamActive(String streamName) {
+    waitForStreamActive(streamName, 30);
+  }
+
+  private void waitForStreamActive(String streamName, int maxAttempts) {
+    try {
+      for (int i = 0; i < maxAttempts; i++) {
+        String status = kinesisClient.describeStream(
+            DescribeStreamRequest.builder().streamName(streamName).build())
+            .streamDescription().streamStatus().toString();
+        if (StreamStatus.ACTIVE.toString().equals(status)) {
+          return;
+        }
+        Thread.sleep(500);
+      }
+      throw new RuntimeException("Stream " + streamName + " did not become 
ACTIVE in time");
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while waiting for stream", e);
+    }
+  }
+
+  public void sendRecords(String streamName, List<String> records) {
+    for (String record : records) {
+      kinesisClient.putRecord(
+          PutRecordRequest.builder()
+              .streamName(streamName)
+              .partitionKey(String.valueOf(System.nanoTime()))
+              .data(SdkBytes.fromUtf8String(record))
+              .build());
+    }
+  }
+
+  public void sendRecords(String streamName, String[] records) {
+    for (String record : records) {
+      kinesisClient.putRecord(
+          PutRecordRequest.builder()
+              .streamName(streamName)
+              .partitionKey(String.valueOf(System.nanoTime()))

Review Comment:
   🤖 nit: the `sendRecords(List)` and `sendRecords(String[])` methods have 
identical loop implementations—have you considered consolidating to just the 
List version?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/KinesisTestUtils.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import com.amazonaws.kinesis.agg.AggRecord;
+import com.amazonaws.kinesis.agg.RecordAggregator;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.ScalingType;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest;
+
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.nio.ByteBuffer;
+import java.net.URI;
+import java.util.List;
+
+/**
+ * Test utilities for Kinesis Data Streams using LocalStack.
+ */
+public class KinesisTestUtils {
+  private static final DockerImageName LOCALSTACK_IMAGE =
+      DockerImageName.parse("localstack/localstack:4.1.0");
+
+  private LocalStackContainer localStack;
+  private KinesisClient kinesisClient;
+
+  public KinesisTestUtils setup() {
+    localStack = new LocalStackContainer(LOCALSTACK_IMAGE)
+        .withServices(LocalStackContainer.Service.KINESIS);
+    localStack.start();
+    kinesisClient = KinesisClient.builder()
+        
.endpointOverride(localStack.getEndpointOverride(LocalStackContainer.Service.KINESIS))
+        .credentialsProvider(
+            StaticCredentialsProvider.create(
+                AwsBasicCredentials.create(localStack.getAccessKey(), 
localStack.getSecretKey())))
+        .region(Region.of(localStack.getRegion()))
+        .build();
+    return this;
+  }
+
+  public String getEndpointUrl() {
+    if (localStack == null || !localStack.isRunning()) {
+      throw new IllegalStateException("LocalStack container is not running. 
Please start the container first.");

Review Comment:
   🤖 nit: could you extract the `if (localStack == null || 
!localStack.isRunning())` check that repeats in `getEndpointUrl()`, 
`getRegion()`, `getAccessKey()`, and `getSecretKey()` into a private helper?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
+
+/**
+ * Kinesis Source Configs for Hudi Streamer.
+ */
+@Immutable
+@ConfigClassProperty(name = "Kinesis Source Configs",
+    groupName = ConfigGroups.Names.HUDI_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
+    description = "Configurations controlling the behavior of Kinesis source 
in Hudi Streamer.")
+public class KinesisSourceConfig extends HoodieConfig {
+
+  private static final String PREFIX = STREAMER_CONFIG_PREFIX + 
"source.kinesis.";
+
+  public static final ConfigProperty<String> KINESIS_STREAM_NAME = 
ConfigProperty
+      .key(PREFIX + "stream.name")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .withDocumentation("Kinesis Data Streams stream name.");
+
+  public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty
+      .key(PREFIX + "region")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS region for the Kinesis stream (e.g., 
us-east-1).");
+
+  public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = 
ConfigProperty
+      .key(PREFIX + "endpoint.url")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Custom endpoint URL for Kinesis (e.g., for 
localstack). "
+          + "If not set, uses the default AWS endpoint for the region.");
+
+  public static final ConfigProperty<String> KINESIS_ACCESS_KEY = 
ConfigProperty
+      .key(PREFIX + "access.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS access key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<String> KINESIS_SECRET_KEY = 
ConfigProperty
+      .key(PREFIX + "secret.key")
+      .noDefaultValue()
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("AWS secret key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
+          + "If not set with endpoint, uses the default AWS credential 
chain.");
+
+  public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = 
ConfigProperty
+      .key(PREFIX + "max.events")
+      .defaultValue(5000000L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records obtained in each batch 
from Kinesis.");
+
+  public static final ConfigProperty<Long> KINESIS_SOURCE_MANUAL_PARTITIONS = 
ConfigProperty
+      .key(PREFIX + "partitions")
+      .defaultValue(0L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Desired number of Spark partitions when reading from 
Kinesis. "
+          + "By default, Hudi has a 1-1 mapping of Kinesis shards to Spark 
partitions. "
+          + "If set to a value greater than 0, the result RDD will be 
repartitioned "
+          + "to increase/decrease downstream parallelism. Use 0 for 1-1 
mapping.");
+
+  public static final ConfigProperty<Boolean> KINESIS_APPEND_OFFSETS = 
ConfigProperty
+      .key(PREFIX + "append.offsets")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, appends Kinesis metadata (sequence 
number, shard id, arrival timestamp, partition key) to records.");
+
+  public static final ConfigProperty<Boolean> KINESIS_ENABLE_DEAGGREGATION = 
ConfigProperty
+      .key(PREFIX + "enable.deaggregation")
+      .defaultValue(true)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("When enabled, de-aggregates records produced by 
Kinesis Producer Library (KPL). "
+          + "Non-aggregated records pass through unchanged. Set to false if 
producers do not use KPL.");
+
+  public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = 
ConfigProperty
+      .key(PREFIX + "enable.fail.on.data.loss")
+      .defaultValue(false)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Fail when checkpoint references an expired shard 
which has not been fully consumed.");
+
+  public static final ConfigProperty<String> KINESIS_STARTING_POSITION = 
ConfigProperty
+      .key(PREFIX + "starting.position")
+      .defaultValue("LATEST")
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Starting position when no checkpoint exists. 
EARLIEST or LATEST. Default: LATEST.");
+
+  public static final ConfigProperty<Integer> KINESIS_MAX_RECORDS_PER_REQUEST 
= ConfigProperty
+      .key(PREFIX + "max.records.per.request")
+      .defaultValue(10000)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Maximum number of records to fetch per GetRecords 
API call. Kinesis limit is 10000.");
+
+  public static final ConfigProperty<Long> KINESIS_GET_RECORDS_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "get_records.interval.ms")
+      .defaultValue(200L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Minimum interval in ms between two GetRecords API 
calls per shard.");
+
+  public static final ConfigProperty<Long> KINESIS_RETRY_INITIAL_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "retry.initial_interval_ms")
+      .defaultValue(1000L)
+      .sinceVersion("1.2.0")
+      .markAdvanced()
+      .withDocumentation("Initial backoff in ms when Kinesis returns 
ProvisionedThroughputExceededException. "
+          + "Backoff doubles each retry up to retry.max_interval_ms.");
+
+  public static final ConfigProperty<Long> KINESIS_RETRY_MAX_INTERVAL_MS = 
ConfigProperty
+      .key(PREFIX + "retry.max_interval_ms")
+      .defaultValue(10000L)
+      .sinceVersion("1.2.0")

Review Comment:
   🤖 nit: config key uses underscores in `max_interval_ms`; use dots instead 
(e.g., `retry.max.interval.ms`) to match the Hudi convention throughout this 
file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to