linliu-code commented on code in PR #18224:
URL: https://github.com/apache/hudi/pull/18224#discussion_r3034648100


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KinesisSourceConfig.java:
##########
@@ -38,116 +37,113 @@
     description = "Configurations controlling the behavior of Kinesis source 
in Hudi Streamer.")
 public class KinesisSourceConfig extends HoodieConfig {
 
-  public static final String KINESIS_CHECKPOINT_TYPE_STRING = "string";
-
   private static final String PREFIX = STREAMER_CONFIG_PREFIX + 
"source.kinesis.";
-  private static final String OLD_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + 
"source.kinesis.";
 
   public static final ConfigProperty<String> KINESIS_STREAM_NAME = 
ConfigProperty
       .key(PREFIX + "stream.name")
       .noDefaultValue()
-      .withAlternatives(OLD_PREFIX + "stream.name")
       .withDocumentation("Kinesis Data Streams stream name.");
 
   public static final ConfigProperty<String> KINESIS_REGION = ConfigProperty
       .key(PREFIX + "region")
       .noDefaultValue()
-      .withAlternatives(OLD_PREFIX + "region")
+      .markAdvanced()
       .withDocumentation("AWS region for the Kinesis stream (e.g., 
us-east-1).");
 
   public static final ConfigProperty<String> KINESIS_ENDPOINT_URL = 
ConfigProperty
       .key(PREFIX + "endpoint.url")
       .noDefaultValue()
-      .withAlternatives(OLD_PREFIX + "endpoint.url")
       .markAdvanced()
       .withDocumentation("Custom endpoint URL for Kinesis (e.g., for 
localstack). "
           + "If not set, uses the default AWS endpoint for the region.");
 
   public static final ConfigProperty<String> KINESIS_ACCESS_KEY = 
ConfigProperty
       .key(PREFIX + "access.key")
       .noDefaultValue()
-      .withAlternatives(OLD_PREFIX + "access.key")
       .markAdvanced()
       .withDocumentation("AWS access key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
           + "If not set with endpoint, uses the default AWS credential 
chain.");
 
   public static final ConfigProperty<String> KINESIS_SECRET_KEY = 
ConfigProperty
       .key(PREFIX + "secret.key")
       .noDefaultValue()
-      .withAlternatives(OLD_PREFIX + "secret.key")
       .markAdvanced()
       .withDocumentation("AWS secret key for Kinesis. Used when connecting to 
custom endpoints (e.g., LocalStack). "
           + "If not set with endpoint, uses the default AWS credential 
chain.");
 
   public static final ConfigProperty<Long> MAX_EVENTS_FROM_KINESIS_SOURCE = 
ConfigProperty
-      .key(STREAMER_CONFIG_PREFIX + "kinesis.source.maxEvents")
+      .key(PREFIX + "max.events")
       .defaultValue(5000000L)
-      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"kinesis.source.maxEvents")
       .markAdvanced()
       .withDocumentation("Maximum number of records obtained in each batch 
from Kinesis.");
 
-  public static final ConfigProperty<Long> KINESIS_SOURCE_MIN_PARTITIONS = 
ConfigProperty
-      .key(PREFIX + "minPartitions")
+  public static final ConfigProperty<Long> KINESIS_SOURCE_MANUAL_PARTITIONS = 
ConfigProperty
+      .key(PREFIX + "manual.partitions")
       .defaultValue(0L)
-      .withAlternatives(OLD_PREFIX + "minPartitions")
       .markAdvanced()
-      .withDocumentation("Desired minimum number of Spark partitions when 
reading from Kinesis. "
+      .withDocumentation("Desired number of Spark partitions when reading from 
Kinesis. "
           + "By default, Hudi has a 1-1 mapping of Kinesis shards to Spark 
partitions. "
-          + "If set to a value greater than the number of shards, the result 
RDD will be repartitioned "
-          + "to increase downstream parallelism. Use 0 for 1-1 mapping.");
+          + "If set to a value greater than 0, the result RDD will be 
repartitioned "
+          + "to increase/decrease downstream parallelism. Use 0 for 1-1 
mapping.");
 
   public static final ConfigProperty<Boolean> KINESIS_APPEND_OFFSETS = 
ConfigProperty
       .key(PREFIX + "append.offsets")
-      .defaultValue(false)
-      .withAlternatives(OLD_PREFIX + "append.offsets")
+      .defaultValue(true)
       .markAdvanced()
       .withDocumentation("When enabled, appends Kinesis metadata (sequence 
number, shard id, arrival timestamp, partition key) to records.");
 
   public static final ConfigProperty<Boolean> KINESIS_ENABLE_DEAGGREGATION = 
ConfigProperty
       .key(PREFIX + "enable.deaggregation")
       .defaultValue(true)
-      .withAlternatives(OLD_PREFIX + "enable.deaggregation")
       .markAdvanced()
       .withDocumentation("When enabled, de-aggregates records produced by 
Kinesis Producer Library (KPL). "
           + "Non-aggregated records pass through unchanged. Set to false if 
producers do not use KPL.");
 
   public static final ConfigProperty<Boolean> ENABLE_FAIL_ON_DATA_LOSS = 
ConfigProperty
-      .key(PREFIX + "enable.failOnDataLoss")
+      .key(PREFIX + "enable.fail.on.data.loss")
       .defaultValue(false)
-      .withAlternatives(OLD_PREFIX + "enable.failOnDataLoss")
       .markAdvanced()
-      .withDocumentation("Fail when checkpoint references an expired shard 
instead of seeking to TRIM_HORIZON.");
+      .withDocumentation("Fail when checkpoint references an expired shard 
which has not been fully consumed.");
 
   public static final ConfigProperty<String> KINESIS_STARTING_POSITION = 
ConfigProperty
       .key(PREFIX + "starting.position")
       .defaultValue("LATEST")
-      .withAlternatives(OLD_PREFIX + "starting.position")
       .markAdvanced()
-      .withDocumentation("Starting position when no checkpoint exists. 
TRIM_HORIZON (or EARLIEST), or LATEST. Default: LATEST.");
+      .withDocumentation("Starting position when no checkpoint exists. 
EARLIEST or LATEST. Default: LATEST.");
 
-  public static final ConfigProperty<Integer> KINESIS_GET_RECORDS_MAX_RECORDS 
= ConfigProperty
-      .key(PREFIX + "getRecords.maxRecords")
+  public static final ConfigProperty<Integer> KINESIS_MAX_RECORDS_PER_REQUEST 
= ConfigProperty
+      .key(PREFIX + "max.records.per.request")
       .defaultValue(10000)
-      .withAlternatives(OLD_PREFIX + "getRecords.maxRecords")
       .markAdvanced()
       .withDocumentation("Maximum number of records to fetch per GetRecords 
API call. Kinesis limit is 10000.");
 
   public static final ConfigProperty<Long> KINESIS_GET_RECORDS_INTERVAL_MS = 
ConfigProperty
-      .key(PREFIX + "getRecords.intervalMs")
+      .key(PREFIX + "get_records.interval.ms")

Review Comment:
   This is to match the function name: getRecords, which I suppose to be more 
intuitive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to