CrynetLogistics commented on a change in pull request #18165:
URL: https://github.com/apache/flink/pull/18165#discussion_r783330641



##########
File path: docs/content/docs/connectors/datastream/kinesis.md
##########
@@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using 
the `ConsumerConfigConstant
 this is called once per stream during stream consumer deregistration, unless 
the `NONE` or `EAGER` registration strategy is configured.
 Retry and backoff parameters can be configured using the 
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.  
 
-## Kinesis Producer
-
-The `FlinkKinesisProducer` uses [Kinesis Producer Library 
(KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)
 to put data from a Flink stream into a Kinesis stream.
-
-Note that the producer is not participating in Flink's checkpointing and 
doesn't provide exactly-once processing guarantees. Also, the Kinesis producer 
does not guarantee that records are written in order to the shards (See 
[here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and 
[here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax)
 for more details).
+## Kinesis Data Streams Sink
 
-In case of a failure or a resharding, data will be written again to Kinesis, 
leading to duplicates. This behavior is usually called "at-least-once" 
semantics.
+The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK 
for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
 to write data from a Flink stream into a Kinesis stream.
 
-To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" 
in the AWS dashboard.
+To write data into a Kinesis stream, make sure the stream is marked as 
"ACTIVE" in the AWS dashboard.
 
 For the monitoring to work, the user accessing the stream needs access to the 
CloudWatch service.
 
 {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}}
 {{< tab "Java" >}}
 ```java
-Properties producerConfig = new Properties();
-// Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-// Optional configs
-producerConfig.put("AggregationMaxCount", "4294967295");
-producerConfig.put("CollectionMaxCount", "1000");
-producerConfig.put("RecordTtl", "30000");
-producerConfig.put("RequestTimeout", "6000");
-producerConfig.put("ThreadPoolSize", "15");
-
-// Disable Aggregation if it's not supported by a consumer
-// producerConfig.put("AggregationEnabled", "false");
-// Switch KinesisProducer's threading model
-// producerConfig.put("ThreadingModel", "PER_REQUEST");
-
-FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
+ElementConverter<String, PutRecordsRequestEntry> elementConverter =
+    KinesisDataStreamsSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+        .build();
+
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+KinesisDataStreamsSink<String> kdsSink =
+    KinesisDataStreamsSink.<String>builder()
+        .setKinesisClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setStreamName("your-stream-name")             // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(16)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(5 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1 * 1024 * 1024)      // Optional
+        .build();
 
 DataStream<String> simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
+simpleStringStream.sinkTo(kdsSink);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val producerConfig = new Properties()
-// Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
-// Optional KPL configs
-producerConfig.put("AggregationMaxCount", "4294967295")
-producerConfig.put("CollectionMaxCount", "1000")
-producerConfig.put("RecordTtl", "30000")
-producerConfig.put("RequestTimeout", "6000")
-producerConfig.put("ThreadPoolSize", "15")
-
-// Disable Aggregation if it's not supported by a consumer
-// producerConfig.put("AggregationEnabled", "false")
-// Switch KinesisProducer's threading model
-// producerConfig.put("ThreadingModel", "PER_REQUEST")
-
-val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, 
producerConfig)
-kinesis.setFailOnError(true)
-kinesis.setDefaultStream("kinesis_stream_name")
-kinesis.setDefaultPartition("0")
+val elementConverter =
+    KinesisDataStreamsSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+        .build()
+
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val kdsSink = KinesisDataStreamsSink.<String>builder()
+    .setKinesisClientProperties(sinkProperties)  // Required
+    .setElementConverter(elementConverter)       // Required
+    .setStreamName("your-stream-name")           // Required
+    .setFailOnError(false)                       // Optional
+    .setMaxBatchSize(500)                        // Optional
+    .setMaxInFlightRequests(16)                  // Optional
+    .setMaxBufferedRequests(10000)               // Optional
+    .setMaxBatchSizeInBytes(5 * 1024 * 1024)     // Optional
+    .setMaxTimeInBufferMS(5000)                  // Optional
+    .setMaxRecordSizeInBytes(1 * 1024 * 1024)    // Optional
+    .build()
 
 val simpleStringStream = ...
-simpleStringStream.addSink(kinesis)
+simpleStringStream.sinkTo(kdsSink)
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
-The above is a simple example of using the producer. To initialize 
`FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, 
`AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` 
instance. Users can also pass in KPL's configurations as optional parameters to 
customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL 
configs and explanations can be found 
[here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties).
 The example demonstrates producing a single Kinesis stream in the AWS region 
"us-east-1".
+The above is a simple example of using the Kinesis sink. Begin by creating a 
`java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and 
`AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the 
builder. The default values for the optional configurations are shown above.
 
-If users don't specify any KPL configs and values, `FlinkKinesisProducer` will 
use default config values of KPL, except `RateLimit`. `RateLimit` limits the 
maximum allowed put rate for a shard, as a percentage of the backend limits. 
KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` 
too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` 
overrides KPL's default value to 100.
+You will always need to supply a `KinesisDataStreamsSinkElementConverter` 
during sink creation. This is where you specify your serialization schema and 
logic for generating a [partition 
key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key)
 from a record.

Review comment:
       So the customer can create an `ElementConverter` on their own without 
using our 'route' of doing `KinesisDataStreamsSinkElementConverter.<>builder() 
... .build()`.
   
   However, it must always be a `ElementConverter<?, PutRecordsRequestEntry>` 
i.e. they've gotta output a `PutRecordsRequestEntry`. If they use 
`PutRecordsRequestEntry.builder()
                   
.data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
                   .partitionKey(partitionKeyGenerator.apply(element))
                   .build();` then I believe it will complain if there is no 
partition key.
   In any case, they must provide a way of creating a `PutRecordsRequestEntry` 
which involves providing a valid partition key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to