hlteoh37 commented on code in PR #179:
URL: 
https://github.com/apache/flink-connector-aws/pull/179#discussion_r1832538357


##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -27,644 +27,348 @@ under the License.
 
 # Amazon Kinesis Data Streams Connector
 
-The Kinesis connector provides access to [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
-
-To use this connector, add one or more of the following dependencies to your 
project, depending on whether you are reading from and/or writing to Kinesis 
Data Streams:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">KDS Connectivity</th>
-      <th class="text-left">Maven Dependency</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Source</td>
-        <td>{{< connector_artifact flink-connector-kinesis kinesis >}}</td>
-    </tr>
-    <tr>
-        <td>Sink</td>
-        <td>{{< connector_artifact flink-connector-aws-kinesis-streams kinesis 
>}}</td>
-    </tr>
-  </tbody>
-</table>
+The Kinesis connector allows users to read/write from [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
 
-{{< py_connector_download_link "kinesis" >}}
+## Dependency
 
-## Using the Amazon Kinesis Streams Service
-Follow the instructions from the [Amazon Kinesis Streams Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams.
+To use this connector, add the below dependency to your project:
 
-## Configuring Access to Kinesis with IAM
-Make sure to create the appropriate IAM policy to allow reading / writing to / 
from the Kinesis streams. See examples 
[here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
+{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}}
 
-Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
-By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the configuration, the `BASIC` 
provider is used.  
+For use in PyFlink jobs, use the following dependency:
 
-A specific Credentials Provider can **optionally** be set by using the 
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
- 
-Supported Credential Providers are:
-* `AUTO` - Using the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
-* `BASIC` - Using access key ID and secret key supplied as configuration. 
-* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
-* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
-* `CUSTOM` - Use a custom user class as credential provider.
-* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
-* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+{{< py_connector_download_link "kinesis" >}}
 
-## Kinesis Consumer
 
-The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source 
that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can transparently handle 
resharding of streams while the job is running. Each subtask of the consumer is
-responsible for fetching data records from multiple Kinesis shards. The number 
of shards fetched by each subtask will
-change as shards are closed and created by Kinesis.
+## Kinesis Streams Source
+The `KinesisStreamsSource` is an exactly-once, parallel streaming data source 
based on the [FLIP-27 source 
interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface).
+The source subscribes to a single Amazon Kinesis Data stream, and reads events 
whilst maintaining order within a specific Kinesis `partitionId`.
+The `KinesisStreamsSource` will discover the shards of the stream and start 
reading from each shard in parallel, depending on the parallelism of the 
operator.
+For more details on selecting the right parallelism, see section on 
[parallelism](#parallelism-and-number-of-shards).
+It also transparently handles discovery of new shards of the Kinesis Data 
stream if resharding of streams occurs while the job is running.
+
+{{< hint info >}}
+Note: Before consuming data, ensure that the Kinesis Data Stream is created 
with `ACTIVE` status on the Amazon Kinesis Data Streams console.
+{{< /hint >}}
 
-Before consuming data from Kinesis streams, make sure that all streams are 
created with the status "ACTIVE" in the Amazon Kinesis Data Stream console.
+The `KinesisStreamsSource` provides a fluent builder to construct an instance 
of the `KinesisStreamsSource`. 
+The code snippet below illustrates how to do so. 
 
-{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}}
+{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3371" >}}
 {{< tab "Java" >}}
 ```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+// Configure the KinesisStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, 
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, 
by default connector will read from LATEST
+
+// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
+KinesisStreamsSource<String> kdsSource =
+        KinesisStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+                .setSourceConfig(sourceConfig)
+                .setDeserializationSchema(new SimpleStringSchema())
+                
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This 
is optional, by default uniformShardAssigner will be used.
+                .build();
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+// Specify watermarking strategy and the name of the Kinesis Source operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+DataStream<String> kinesisRecordsWithEventTimeWatermarks = 
env.fromSource(kdsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "Kinesis source")
+        .returns(TypeInformation.of(String.class))
+        .uid("custom-uid");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val consumerConfig = new Properties()
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
-consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
-
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+val sourceConfig = new Configuration()
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, 
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON) // This is optional, 
by default connector will read from LATEST
 
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-consumer_config = {
-    'aws.region': 'us-east-1',
-    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
-    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
-    'flink.stream.initpos': 'LATEST'
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
-env = StreamExecutionEnvironment.get_execution_environment()
+val kdsSource = KinesisStreamsSource.builder[String]()
+            
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+            .setSourceConfig(sourceConfig)
+            .setDeserializationSchema(new SimpleStringSchema())
+            
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This 
is optional, by default uniformShardAssigner will be used.
+            .build()
 
-kinesis = env.add_source(FlinkKinesisConsumer("stream-1", 
SimpleStringSchema(), consumer_config))
+val kinesisEvents = env.fromSource(kdsSource, 
WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "Kinesis source")
+            .uid("custom-uid")
 ```
 {{< /tab >}}
-{{< /tabs >}}
 
-The above is a simple example of using the consumer. Configuration for the 
consumer is supplied with a `java.util.Properties`
-instance, the configuration keys for which can be found in 
`AWSConfigConstants` (AWS-specific parameters) and 
-`ConsumerConfigConstants` (Kinesis consumer parameters). The example
-demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". 
The AWS credentials are supplied using the basic method in which
-the AWS access key ID and secret access key are directly supplied in the 
configuration. Also, data is being consumed
-from the newest position in the Kinesis stream (the other option will be 
setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
-to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream 
from the earliest record possible).
-
-Other optional configuration keys for the consumer can be found in 
`ConsumerConfigConstants`.
-
-Note that the configured parallelism of the Flink Kinesis Consumer source
-can be completely independent of the total number of shards in the Kinesis 
streams.
-When the number of shards is larger than the parallelism of the consumer,
-then each consumer subtask can subscribe to multiple shards; otherwise
-if the number of shards is smaller than the parallelism of the consumer,
-then some consumer subtasks will simply be idle and wait until it gets assigned
-new shards (i.e., when the streams are resharded to increase the
-number of shards for higher provisioned Kinesis service throughput).
-
-Also note that the default assignment of shards to subtasks is based on the 
hashes of the shard and stream names,
-which will more-or-less balance the shards across the subtasks.
-However, assuming the default Kinesis shard management is used on the stream 
(UpdateShardCount with `UNIFORM_SCALING`),
-setting `UniformShardAssigner` as the shard assigner on the consumer will much 
more evenly distribute shards to subtasks.
-Assuming the incoming Kinesis records are assigned random Kinesis 
`PartitionKey` or `ExplicitHashKey` values,
-the result is consistent subtask loading.
-If neither the default assigner nor the `UniformShardAssigner` suffice, a 
custom implementation of `KinesisShardAssigner` can be set.
-
-### The `DeserializationSchema`
-
-Flink Kinesis Consumer also needs a schema to know how to turn the binary data 
in a Kinesis Data Stream into Java objects.
-The `KinesisDeserializationSchema` allows users to specify such a schema. The 
`T deserialize(byte[] recordValue, String partitionKey, String seqNum, long 
approxArrivalTimestamp, String stream, String shardId)` 
-method gets called for each Kinesis record.
+The above is a simple example of using the `KinesisStreamsSource`.
+- The Kinesis stream being read from is specified using the Kinesis Stream ARN.
+- Configuration for the `Source` is supplied using an instance of Flink's 
`Configuration` class.
+  The configuration keys can be taken from `AWSConfigOptions` (AWS-specific 
configuration) and `KinesisSourceConfigOptions` (Kinesis Source configuration).
+- The example specifies the starting position as `TRIM_HORIZON` (see 
[Configuring Starting Position](#configuring-starting-position) for more 
information).
+- The deserialization format is as `SimpleStringSchema` (see [Deserialization 
Schema](#deserialization-schema) for more information).
+- The distribution of shards across subtasks is controlled using the 
`UniformShardAssigner`  (see [Shard Assignment 
Strategy](#shard-assignment-strategy) for more information).
+- The example also specifies an increasing `WatermarkStrategy`, which means 
each record will be tagged with event time specified using 
`approximateArrivalTimestamp`. 
+  Monotonically increasing watermarks will be generated, and subtasks will be 
considered idle if no record is emitted after 1 second.
 
-For convenience, Flink provides the following schemas out of the box:
-  
-1. `TypeInformationSerializationSchema` which creates a schema based on a 
Flink's `TypeInformation`. 
-    This is useful if the data is both written and read by Flink.
-    This schema is a performant Flink-specific alternative to other generic 
serialization approaches.
-    
-2. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup 
the writer's schema (schema which was used to write the record)
-   in [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). 
Using this, deserialization schema record will be
-   read with the schema retrieved from AWS Glue Schema Registry and 
transformed to either 
`com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema`
-   that represents generic record with a manually provided schema or a JAVA 
POJO generated by 
[mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema).
  
-   
-   <br>To use this deserialization schema one has to add the following 
additional dependency:
-       
-{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
-{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}}
-{{< connector_artifact flink-json-glue-schema-registry kinesis >}}
-{{< /tab >}}
-{{< /tabs >}}
-    
-3. `AvroDeserializationSchema` which reads data serialized with Avro format 
using a statically provided schema. It can
-    infer the schema from Avro generated classes 
(`AvroDeserializationSchema.forSpecific(...)`) or it can work with 
`GenericRecords`
-    with a manually provided schema (with 
`AvroDeserializationSchema.forGeneric(...)`). This deserialization schema 
expects that
-    the serialized records DO NOT contain the embedded schema.
+### Configuring Access to Kinesis with IAM
+Access to Kinesis streams are controlled via IAM identities. Make sure to 
create the appropriate IAM policy to allow reading / writing to / from the 
Kinesis streams. See examples 
[here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
 
-    - You can use [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
-      to retrieve the writer’s schema. Similarly, the deserialization record 
will be read with the schema from AWS Glue Schema Registry and transformed
-      (either through 
`GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or 
`GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`).
-      For more information on integrating the AWS Glue Schema Registry with 
Apache Flink see
-      [Use Case: Amazon Kinesis Data Analytics for Apache 
Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink).
+Depending on your deployment, you can select a suitable AWS Credentials 
Provider.
+By default, the `AUTO` Credentials Provider is used. 
+If the access key ID and secret key are set in the configuration, the `BASIC` 
provider is used.
 
-    <br>To use this deserialization schema one has to add the following 
additional dependency:
-    
-{{< tabs "71c8eb0c-6a78-476f-a52e-8a46d83f2ca4" >}}
-{{< tab "AvroDeserializationSchema" >}}
-{{< artifact flink-avro >}}
-{{< /tab >}}
-{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}}
-{{< connector_artifact flink-avro-glue-schema-registry kinesis >}}
-{{< /tab >}}
-{{< /tabs >}}
+A specific Credentials Provider can **optionally** be set by using the 
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
 
-### Configuring Starting Position
+Supported Credential Providers are:
+* `AUTO` - Using the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
+* `BASIC` - Using access key ID and secret key supplied as configuration.
+* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
+* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
+* `CUSTOM` - Use a custom user class as credential provider.
+* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
 
-The Flink Kinesis Consumer currently provides the following options to 
configure where to start reading Kinesis streams, simply by setting 
`ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
-one of the following values in the provided configuration properties (the 
naming of the options identically follows [the namings used by the AWS Kinesis 
Streams 
service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
+### Configuring Starting Position
+To specify where the `KinesisStreamsSource` starts reading from the Kinesis 
stream, users can set the `KinesisSourceConfigOptions.STREAM_INITIAL_POSITION` 
in configuration.
+The values used follow [the namings used by the AWS Kinesis Data Streams 
service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax):
 
-- `LATEST`: read all shards of all streams starting from the latest record.
-- `TRIM_HORIZON`: read all shards of all streams starting from the earliest 
record possible (data may be trimmed by Kinesis depending on the retention 
settings).
-- `AT_TIMESTAMP`: read all shards of all streams starting from a specified 
timestamp. The timestamp must also be specified in the configuration
-properties by providing a value for 
`ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following 
date pattern :
+- `LATEST`: read all shards of the stream starting from the latest record.
+- `TRIM_HORIZON`: read all shards of the stream starting from the earliest 
record possible (data may be trimmed by Kinesis depending on the retention 
settings).
+- `AT_TIMESTAMP`: read all shards of the stream starting from a specified 
timestamp. The timestamp must also be specified in the configuration
+  properties by providing a value for 
`KinesisSourceConfigOptions.STREAM_INITIAL_TIMESTAMP`, in one of the following 
date pattern :
     - a non-negative double value representing the number of seconds that has 
elapsed since the Unix epoch (for example, `1459799926.480`).
-    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` 
provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`.
-    If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined 
then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
-    (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` 
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without 
given a pattern).
+    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` 
provided by `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT`.
+      If `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT` is not 
defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
+      (for example, timestamp value is `2016-04-04` and pattern is 
`yyyy-MM-dd` given by user or timestamp value is 
`2016-04-04T19:58:46.480-00:00` without given a pattern).
+
 
 ### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 
-With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume 
records from shards in Kinesis streams and
+With Flink's checkpointing enabled, the `KinesisStreamsSource` will consume 
records from shards in Kinesis streams and
 periodically checkpoint each shard's progress. In case of a job failure, Flink 
will restore the streaming program to the
 state of the latest complete checkpoint and re-consume the records from 
Kinesis shards, starting from the progress that
 was stored in the checkpoint.
 
-The interval of drawing checkpoints therefore defines how much the program may 
have to go back at most, in case of a failure.
+Note that when restoring from a snapshot, the configured starting positions 
will be ignored. The `KinesisStreamsSource` 
+will proceed to read from where it left off in the snapshot. If the restored 
snapshot is stale (e.g. the shards saved are 
+now expired and past the retention period of the Kinesis stream), it will read 
the earliest possible event (effectively a `TRIM_HORIZON`)
 
-To use fault tolerant Kinesis Consumers, checkpointing of the topology needs 
to be enabled at the execution environment:
+If users want to restore a Flink job from an existing snapshot but want to 
respect the configured starting position of 
+the stream, users can change the `uid` of the `KinesisStreamsSource` operator 
to effectively restore this operator without state.
 
-{{< tabs "b1399ed7-5855-446d-9684-7a49de9b4c97" >}}
-{{< tab "Java" >}}
-```java
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-env = StreamExecutionEnvironment.get_execution_environment()
-env.enable_checkpointing(5000) # checkpoint every 5000 msecs
-```
-{{< /tab >}}
-{{< /tabs >}}
 
-Also note that Flink can only restart the topology if enough processing slots 
are available to restart the topology.
-Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
+### Shard Assignment Strategy
+For most use cases, users would prefer a uniform distribution of records 
across parallel subtasks. This prevents data skew if data is evenly distributed 
in the Kinesis Data Stream.
+This is achieved by the `UniformShardAssigner`, which is the default shard 
assignment strategy. Users can implement their own custom strategy by 
implementing the interface for `KinesisShardAssigner`.
 
-### Using Enhanced Fan-Out
+The uniform distribution of shards across parallel subtasks is a tricky 
situation, especially if the stream has been resharded.
+Amazon Kinesis Data streams distributes `partitionId`s evenly across the 
entire `HashKeyRange` of a given stream, and these ranges are evenly 
distributed across all open shards if `UNIFORM_SCALING` is used.
+However, there will be a mixture of Open and Closed shards on the Kinesis Data 
Stream, and the status of each shard can change during a rescaling operation.
 
-[Enhanced Fan-Out 
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the 
maximum 
-number of concurrent consumers per Kinesis stream.
-Without EFO, all concurrent consumers share a single read quota per shard. 
-Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers. 
-Using EFO will [incur additional 
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
- 
-In order to enable EFO two additional configuration parameters are required:
+To ensure a uniform distribution of `partitionId`s across each parallel 
subtask, the `UniformShardAssigner` uses the `HashKeyRange` of each shard to 
decide which parallel subtask will read from the discovered shard.
 
-- `RECORD_PUBLISHER_TYPE`: Determines whether to use `EFO` or `POLLING`. The 
default `RecordPublisher` is `POLLING`.
-- `EFO_CONSUMER_NAME`: A name to identify the consumer. 
-For a given Kinesis data stream, each consumer must have a unique name. 
-However, consumer names do not have to be unique across data streams. 
-Reusing a consumer name will result in existing subscriptions being terminated.
+### Record ordering
+Kinesis maintains the write order of records per `partitionId` within a 
Kinesis stream. The `KinesisStreamsSource` reads
+records in the same order within a given `partitionId`, even through 
resharding operations. It does this by first checking if 
+a given shard's parents (up to 2 shards) have been completely read, before 
proceeding to read from the given shard.
 
-The code snippet below shows a simple example configurating an EFO consumer.
+### Deserialization Schema
+The `KinesisStreamsSource` retrieves binary data from the Kinesis Data Stream, 
and needs a schema to convert it into Java objects. 
+Both Flink's `DeserializationSchema` and the custom 
`KinesisDeserializationSchema` are accepted by the `KinesisStreamsSource`.
+The `KinesisDeserializationSchema` provides additional Kinesis-specific 
metadata per record to allow users to make serialization decisions based off 
the metadata.
 
-{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}}
-{{< tab "Java" >}}
-```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+For convenience, Flink provides the following schemas out of the box:
+1. `SimpleStringSchema` and `JsonSerializationSchema`.
 
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+2. `TypeInformationSerializationSchema` which creates a schema based on a 
Flink's `TypeInformation`.
+   This is useful if the data is both written and read by Flink.
+   This schema is a performant Flink-specific alternative to other generic 
serialization approaches.
 
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+3. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup 
the writer's schema (schema which was used to write the record)
+   in [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). 
Using this, deserialization schema record will be
+   read with the schema retrieved from AWS Glue Schema Registry and 
transformed to either 
`com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema`
+   that represents generic record with a manually provided schema or a JAVA 
POJO generated by 
[mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema).
 
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val consumerConfig = new Properties()
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+   <br>To use this deserialization schema one has to add the following 
additional dependency:
 
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
+{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}}
+{{< connector_artifact flink-json-glue-schema-registry kinesis >}}
+{{< /tab >}}
+{{< /tabs >}}
 
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
+4. `AvroDeserializationSchema` which reads data serialized with Avro format 
using a statically provided schema. It can
+   infer the schema from Avro generated classes 
(`AvroDeserializationSchema.forSpecific(...)`) or it can work with 
`GenericRecords`
+   with a manually provided schema (with 
`AvroDeserializationSchema.forGeneric(...)`). This deserialization schema 
expects that
+   the serialized records DO NOT contain the embedded schema.
 
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-consumer_config = {
-    'aws.region': 'us-east-1',
-    'flink.stream.initpos': 'LATEST',
-    'flink.stream.recordpublisher':  'EFO',
-    'flink.stream.efo.consumername': 'my-flink-efo-consumer'
-}
+    - You can use [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
+      to retrieve the writer’s schema. Similarly, the deserialization record 
will be read with the schema from AWS Glue Schema Registry and transformed
+      (either through 
`GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or 
`GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`).
+      For more information on integrating the AWS Glue Schema Registry with 
Apache Flink see
+      [Use Case: Amazon Kinesis Data Analytics for Apache 
Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink).
 
-env = StreamExecutionEnvironment.get_execution_environment()
+   <br>To use this deserialization schema one has to add the following 
additional dependency:
 
-kinesis = env.add_source(FlinkKinesisConsumer(
-    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
-```
+{{< tabs "71c8eb0c-6a78-476f-a52e-8a46d83f2ca4" >}}
+{{< tab "AvroDeserializationSchema" >}}
+{{< artifact flink-avro >}}
+{{< /tab >}}
+{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}}
+{{< connector_artifact flink-avro-glue-schema-registry kinesis >}}
 {{< /tab >}}
 {{< /tabs >}}
 
-#### EFO Stream Consumer Registration/Deregistration
-
-In order to use EFO, a stream consumer must be registered against each stream 
you wish to consume.
-By default, the `FlinkKinesisConsumer` will register the stream consumer 
automatically when the Flink job starts.
-The stream consumer will be registered using the name provided by the 
`EFO_CONSUMER_NAME` configuration.
-`FlinkKinesisConsumer` provides three registration strategies:
-
-- Registration
-  - `LAZY` (default): Stream consumers are registered when the Flink job 
starts running.
-    If the stream consumer already exists, it will be reused.
-    This is the preferred strategy for the majority of applications.
-    However, jobs with parallelism greater than 1 will result in tasks 
competing to register and acquire the stream consumer ARN.
-    For jobs with very large parallelism this can result in an increased 
start-up time.
-    The `DescribeStreamConsumer` operation has a limit of 20 [transactions per 
second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
-    this means application startup time will increase by roughly 
`parallelism/20 seconds`.
-  - `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` 
constructor.
-    If the stream consumer already exists, it will be reused. 
-    This will result in registration occurring when the job is constructed, 
-    either on the Flink Job Manager or client environment submitting the job.
-    Using this strategy results in a single thread registering and retrieving 
the stream consumer ARN, 
-    reducing startup time over `LAZY` (with large parallelism).
-    However, consider that the client environment will require access to the 
AWS services.
-  - `NONE`: Stream consumer registration is not performed by 
`FlinkKinesisConsumer`.
-    Registration must be performed externally using the [AWS CLI or 
SDK](https://aws.amazon.com/tools/)
-    to invoke 
[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).
-    Stream consumer ARNs should be provided to the job via the consumer 
configuration.
-- Deregistration
-  - `LAZY` (default): Stream consumers are deregistered when the job is 
shutdown gracefully.
-    In the event that a job terminates without executing the shutdown hooks, 
stream consumers will remain active.
-    In this situation the stream consumers will be gracefully reused when the 
application restarts. 
-  - `EAGER|NONE`: Stream consumer deregistration is not performed by 
`FlinkKinesisConsumer`.
-
-Below is an example configuration to use the `EAGER` registration strategy:
-
-{{< tabs "a85d716b-6c1c-46d8-9ee4-12d8380a0c06" >}}
-{{< tab "Java" >}}
-```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
-
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+### Parallelism and Number of Shards
+The configured parallelism of the `KinesisStreamsSource` is independent of the 
total number of shards in the Kinesis streams.
+- If the parallelism of the `KinesisStreamsSource` is less than the total 
number of shards, then a single parallel subtask would handle multiple shards.
+- If the parallelism of the `KinesisStreamsSource` is more than the total 
number of shards, then there will be some parallel subtasks that do not read 
from any shards. If this is the case, users will need to set up `withIdleness` 
on the `WatermarkStrategy`. Failing to do so will mean watermark generation 
will get blocked due to idle subtasks.
 
-consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
-    ConsumerConfigConstants.EFORegistrationType.EAGER.name());
+### Watermark Handling in the source
 
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val consumerConfig = new Properties()
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+The `KinesisStreamsSource` supplies the `approximateArrivalTimestamp` provided 
by Kinesis as the event time associated with each record read.
+For more information on event time handling in Flink, see [Event time]({{< ref 
"docs/concepts/time" >}}). 
+Note that this timestamp is typically referred to as a Kinesis server-side 
timestamp, and there are no guarantees 
+about the accuracy or order correctness (i.e., the timestamps may not always 
be ascending).
 
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+### Event Time Alignment for Shard Readers
 
-consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
-    ConsumerConfigConstants.EFORegistrationType.EAGER.name());
+Since the shards are being consumed in parallel, some shards might be reading 
records far ahead from other shards.
+Flink supports synchronization between these reading speed using 
split-specific watermark alignment. The `KinesisStreamsSource`
+supports split-specific watermark alignment, and will pause reading from 
specific shards if the watermark from that shard
+is too far ahead of others. It will resume reading from that specific split 
once the other shards catch up.
 
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
+### Threading Model
 
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-consumer_config = {
-    'aws.region': 'us-east-1',
-    'flink.stream.initpos': 'LATEST',
-    'flink.stream.recordpublisher':  'EFO',
-    'flink.stream.efo.consumername': 'my-flink-efo-consumer',
-    'flink.stream.efo.registration': 'EAGER'
-}
+The `KinesisStreamsSource` uses multiple threads for shard discovery and data 
consumption.
 
-env = StreamExecutionEnvironment.get_execution_environment()
+#### Shard Discovery
 
-kinesis = env.add_source(FlinkKinesisConsumer(
-    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
-```
-{{< /tab >}}
-{{< /tabs >}}
+For shard discovery, the `SplitEnumerator` runs on the JobManager to 
periodically discover new shards using the `ListShard` API.
+Once new shards are discovered, it will confirm if the parent shards have been 
completed. If all parents have been completed,
+the shards will be assigned to the `SplitReader` on the TaskManagers to be 
read.
 
-Below is an example configuration to use the `NONE` registration strategy:
+#### Polling (default) Split Reader
 
-{{< tabs "00b46c87-7740-4263-8040-2aa7e2960513" >}}
-{{< tab "Java" >}}
-```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+For `POLLING` data consumption, a single thread will be created per-parallel 
subtask to consume allocated shards. This means 
+that the number of open threads scale with the parallelism of the Flink 
operator.
 
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+#### Enhanced Fan-Out Split Reader
 
-consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
-    ConsumerConfigConstants.EFORegistrationType.NONE.name());
-consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), 
-    
"arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>");
+For `EFO` data consumption the threading model is the same as `POLLING` - one 
thread per-parallel subtask. However, 
+there are additional thread pools to handle asynchronous communication with 
Kinesis. AWS SDK v2.x `KinesisAsyncClient` 
+uses additional threads for Netty to handle IO and asynchronous response. Each 
parallel subtask will have their own 
+instance of the `KinesisAsyncClient`. In other words, if the consumer is run 
with a parallelism of 10, there will be a 
+total of 10 `KinesisAsyncClient` instances. A separate client will be created 
and subsequently destroyed when 
+registering and deregistering stream consumers.
 
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+### Internally Used Kinesis APIs
 
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val consumerConfig = new Properties()
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+The `KinesisStreamsSource` uses the [AWS Java 
SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs
+for shard discovery and data consumption. Due to Amazon's [service limits for 
Kinesis 
Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html),
+the `KinesisStreamsSource` will compete with other non-Flink consuming 
applications that the user may be running.
+Below is a list of APIs called by the consumer with description of how the 
consumer uses the API, as well as information
+on how to deal with any errors or warnings that the `KinesisStreamsSource` may 
have due to these service limits.
 
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+#### Shard Discovery
 
-consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
-    ConsumerConfigConstants.EFORegistrationType.NONE.name());
-consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), 
-    
"arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>");
+- 
*[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)*:
 this is periodically called
+  by the `SplitEnumerator`, one per Flink job, to discover any new shards as a 
result of stream resharding. By default,
+  the `SplitEnumerator` performs the shard discovery at an interval of 10 
seconds. If this interferes with other non-Flink 
+  consuming applications, users can slow down the calls to this API by setting 
a value for 
+  `KinesisSourceConfigOptions.SHARD_DISCOVERY_INTERVAL` in the supplied 
`Configuration`. 
+  This sets the discovery interval to a different value. Note that this 
setting directly impacts
+  the maximum delay of discovering a new shard and starting to consume it, as 
shards will not be discovered during the interval.
 
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
+#### Polling (default) Split Reader
 
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-consumer_config = {
-    'aws.region': 'us-east-1',
-    'flink.stream.initpos': 'LATEST',
-    'flink.stream.recordpublisher':  'EFO',
-    'flink.stream.efo.consumername': 'my-flink-efo-consumer',
-    'flink.stream.efo.consumerarn.stream-name':
-        
'arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>'
-}
+- 
*[GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*:
 this is called once per shard, Note that since the rate limit for this API is 
per shard (not per stream),
+  the split reader itself should not exceed the limit. 
 
-env = StreamExecutionEnvironment.get_execution_environment()
+- 
*[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*:
 this is constantly called to fetch records from Kinesis. When a shard has 
multiple concurrent consumers (when there
+  are any other non-Flink consuming applications running), the per shard rate 
limit may be exceeded. By default, on each call
+  of this API, the consumer will retry if Kinesis complains that the data size 
/ transaction limit for the API has exceeded.
 
-kinesis = env.add_source(FlinkKinesisConsumer(
-    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
-```
-{{< /tab >}}
-{{< /tabs >}}
+#### Enhanced Fan-Out Split Reader
 
-### Event Time for Consumed Records
+- 
*[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)*:
 this is called per shard to obtain shard subscriptions. A shard subscription 
is typically active for 5 minutes,
+  but subscriptions will be re-acquired if any recoverable errors are thrown. 
Once a subscription is acquired, the consumer
+  will receive a stream of 
[SubscribeToShardEvents](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)s.
 
-If streaming topologies choose to use the [event time notion]({{< ref 
"docs/concepts/time" >}}) for record
-timestamps, an *approximate arrival timestamp* will be used by default. This 
timestamp is attached to records by Kinesis once they
-were successfully received and stored by streams. Note that this timestamp is 
typically referred to as a Kinesis server-side
-timestamp, and there are no guarantees about the accuracy or order correctness 
(i.e., the timestamps may not always be
-ascending).
+- 
*[DescribeStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html)*:
+  this is called only during application startup per parallel subtask of the 
operator. This is to retrieve the `consumerArn`
+  for the `ACTIVE` consumer attached to the stream. The retry strategy can be 
configured using 
`KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY*` options
 
-Users can choose to override this default with a custom timestamp, as 
described [here]({{< ref "docs/dev/datastream/event-time/generating_watermarks" 
>}}),
-or use one from the [predefined ones]({{< ref 
"docs/dev/datastream/event-time/built_in" >}}). After doing so,
-it can be passed to the consumer in the following way:
+- 
*[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*:
+  this is called once per stream during stream consumer registration, unless 
the `SELF_MANAGED` consumer lifecycle is configured.
 
-{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6600" >}}
-{{< tab "Java" >}}
-```java
-FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
-    "kinesis_stream_name",
-    new SimpleStringSchema(),
-    kinesisConsumerConfig);
-consumer.setPeriodicWatermarkAssigner(new 
CustomAssignerWithPeriodicWatermarks());
-DataStream<String> stream = env
-       .addSource(consumer)
-       .print();
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val consumer = new FlinkKinesisConsumer[String](
-    "kinesis_stream_name",
-    new SimpleStringSchema(),
-    kinesisConsumerConfig);
-consumer.setPeriodicWatermarkAssigner(new 
CustomAssignerWithPeriodicWatermarks());
-val stream = env
-       .addSource(consumer)
-       .print();
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-consumer = FlinkKinesisConsumer(
-    "kinesis_stream_name",
-    SimpleStringSchema(),
-    consumer_config)
-stream = env.add_source(consumer).print()
-```
-{{< /tab >}}
-{{< /tabs >}}
+- 
*[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*:
+  this is called once per stream during stream consumer deregistration, unless 
the `SELF_MANAGED` registration strategy is configured.
 
-Internally, an instance of the assigner is executed per shard / consumer 
thread (see threading model below).
-When an assigner is specified, for each record read from Kinesis, the 
extractTimestamp(T element, long previousElementTimestamp)
-is called to assign a timestamp to the record and getCurrentWatermark() to 
determine the new watermark for the shard.
-The watermark of the consumer subtask is then determined as the minimum 
watermark of all its shards and emitted periodically.
-The per shard watermark is essential to deal with varying consumption speed 
between shards, that otherwise could lead
-to issues with downstream logic that relies on the watermark, such as 
incorrect late data dropping.
+### Using Enhanced Fan-Out
 
-By default, the watermark is going to stall if shards do not deliver new 
records.
-The property `ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS` can be used 
to avoid this potential issue through a
-timeout that will allow the watermark to progress despite of idle shards.
+[Enhanced Fan-Out 
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the 
maximum number of concurrent consumers per Kinesis stream.
+Without EFO, all concurrent consumers share a single read quota per shard.
+Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers.
+Using EFO will [incur additional 
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
 
-### Event Time Alignment for Shard Consumers
+In order to enable EFO two additional configuration parameters are required:
 
-The Flink Kinesis Consumer optionally supports synchronization between 
parallel consumer subtasks (and their threads)
-to avoid the event time skew related problems described in [Event time 
synchronization across 
sources](https://issues.apache.org/jira/browse/FLINK-10886).
+- `READER_TYPE`: Determines whether to use `EFO` or `POLLING`. The default 
`ReaderType` is `POLLING`.
+- `EFO_CONSUMER_NAME`: A name to identify the consumer.
+  For a given Kinesis data stream, each consumer must have a unique name.
+  However, consumer names do not have to be unique across data streams.
+  Reusing a consumer name will result in existing subscriptions being 
terminated.
 
-To enable synchronization, set the watermark tracker on the consumer:
+The code snippet below shows a simple example configuring an EFO consumer.
 
-{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6601" >}}
+{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}}
 {{< tab "Java" >}}
 ```java
-JobManagerWatermarkTracker watermarkTracker =
-    new JobManagerWatermarkTracker("myKinesisSource");
-consumer.setWatermarkTracker(watermarkTracker);
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, 
KinesisSourceConfigOptions.ReaderType.EFO);
+sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
 ```
 {{< /tab >}}
-{{< tab "Python" >}}
-```python
-watermark_tracker = 
WatermarkTracker.job_manager_watermark_tracker("myKinesisSource")
-consumer.set_watermark_tracker(watermark_tracker)
+{{< tab "Scala" >}}
+```scala
+val sourceConfig = new Configuration()
+sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, 
KinesisSourceConfigOptions.ReaderType.EFO)
+sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer")
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
-The `JobManagerWatermarkTracker` will use a global aggregate to synchronize 
the per subtask watermarks. Each subtask
-uses a per shard queue to control the rate at which records are emitted 
downstream based on how far ahead of the global
-watermark the next record in the queue is.
-
-The "emit ahead" limit is configured via 
`ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS`. Smaller values reduce
-the skew but also the throughput. Larger values will allow the subtask to 
proceed further before waiting for the global
-watermark to advance.
-
-Another variable in the throughput equation is how frequently the watermark is 
propagated by the tracker.
-The interval can be configured via 
`ConsumerConfigConstants.WATERMARK_SYNC_MILLIS`.
-Smaller values reduce emitter waits and come at the cost of increased 
communication with the job manager.
-
-Since records accumulate in the queues when skew occurs, increased memory 
consumption needs to be expected.
-How much depends on the average record size. With larger sizes, it may be 
necessary to adjust the emitter queue capacity via
-`ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY`.
-
-### Threading Model
-
-The Flink Kinesis Consumer uses multiple threads for shard discovery and data 
consumption.
+#### EFO Stream Consumer Lifecycle Management
 
-#### Shard Discovery
-
-For shard discovery, each parallel consumer subtask will have a single thread 
that constantly queries Kinesis for shard
-information even if the subtask initially did not have shards to read from 
when the consumer was started. In other words, if
-the consumer is run with a parallelism of 10, there will be a total of 10 
threads constantly querying Kinesis regardless
-of the total amount of shards in the subscribed streams.
-
-#### Polling (default) Record Publisher
+In order to use EFO, a stream consumer must be registered against the stream 
you wish to consume.
+By default, the `KinesisStreamsSource` will manage the lifecycle of the stream 
consumer automatically when the Flink job starts/stops.
+The stream consumer will be registered using the name provided by the 
`EFO_CONSUMER_NAME` configuration, and de-registered when job stops gracefully.
+`KinesisStreamsSource` provides two lifecycle options:
 
-For `POLLING` data consumption, a single thread will be created to consume 
each discovered shard. Threads will terminate when the
-shard it is responsible of consuming is closed as a result of stream 
resharding. In other words, there will always be
-one thread per open shard.
+- `JOB_MANAGED` (default): Stream consumers are registered when the Flink job 
starts running.
+  If the stream consumer already exists, it will be reused.
+  When the job stops gracefully, the consumer will be de-registered.
+  This is the preferred strategy for the majority of applications.
+- `SELF_MANAGED`: Stream consumer registration/de-registration will not be 
performed by the `KinesisStreamsSource`.
+  Registration must be performed externally using the [AWS CLI or 
SDK](https://aws.amazon.com/tools/)
+  to invoke 
[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).
+  Stream consumer ARNs should be provided to the job via the consumer 
configuration.
 
-#### Enhanced Fan-Out Record Publisher
-
-For `EFO` data consumption the threading model is the same as `POLLING`, with 
additional thread pools to handle 
-asynchronous communication with Kinesis. AWS SDK v2.x `KinesisAsyncClient` 
uses additional threads for 
-Netty to handle IO and asynchronous response. Each parallel consumer subtask 
will have their own instance of the `KinesisAsyncClient`.
-In other words, if the consumer is run with a parallelism of 10, there will be 
a total of 10 `KinesisAsyncClient` instances.
-A separate client will be created and subsequently destroyed when registering 
and deregistering stream consumers.
-
-### Internally Used Kinesis APIs
+To specify the lifecycle management, simply specify the 
`KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE` in the `Configuration` 
provided.

Review Comment:
   This is explained in the blurb above. Made this clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to