hlteoh37 commented on code in PR #179:
URL: 
https://github.com/apache/flink-connector-aws/pull/179#discussion_r1832527361


##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -27,644 +27,348 @@ under the License.
 
 # Amazon Kinesis Data Streams Connector
 
-The Kinesis connector provides access to [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
-
-To use this connector, add one or more of the following dependencies to your 
project, depending on whether you are reading from and/or writing to Kinesis 
Data Streams:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">KDS Connectivity</th>
-      <th class="text-left">Maven Dependency</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Source</td>
-        <td>{{< connector_artifact flink-connector-kinesis kinesis >}}</td>
-    </tr>
-    <tr>
-        <td>Sink</td>
-        <td>{{< connector_artifact flink-connector-aws-kinesis-streams kinesis 
>}}</td>
-    </tr>
-  </tbody>
-</table>
+The Kinesis connector allows users to read/write from [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
 
-{{< py_connector_download_link "kinesis" >}}
+## Dependency
 
-## Using the Amazon Kinesis Streams Service
-Follow the instructions from the [Amazon Kinesis Streams Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams.
+To use this connector, add the below dependency to your project:
 
-## Configuring Access to Kinesis with IAM
-Make sure to create the appropriate IAM policy to allow reading / writing to / 
from the Kinesis streams. See examples 
[here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
+{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}}
 
-Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
-By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the configuration, the `BASIC` 
provider is used.  
+For use in PyFlink jobs, use the following dependency:
 
-A specific Credentials Provider can **optionally** be set by using the 
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
- 
-Supported Credential Providers are:
-* `AUTO` - Using the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
-* `BASIC` - Using access key ID and secret key supplied as configuration. 
-* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
-* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
-* `CUSTOM` - Use a custom user class as credential provider.
-* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
-* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+{{< py_connector_download_link "kinesis" >}}
 
-## Kinesis Consumer
 
-The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source 
that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can transparently handle 
resharding of streams while the job is running. Each subtask of the consumer is
-responsible for fetching data records from multiple Kinesis shards. The number 
of shards fetched by each subtask will
-change as shards are closed and created by Kinesis.
+## Kinesis Streams Source
+The `KinesisStreamsSource` is an exactly-once, parallel streaming data source 
based on the [FLIP-27 source 
interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface).
+The source subscribes to a single Amazon Kinesis Data stream, and reads events 
whilst maintaining order within a specific Kinesis `partitionId`.
+The `KinesisStreamsSource` will discover the shards of the stream and start 
reading from each shard in parallel, depending on the parallelism of the 
operator.
+For more details on selecting the right parallelism, see section on 
[parallelism](#parallelism-and-number-of-shards).
+It also transparently handles discovery of new shards of the Kinesis Data 
stream if resharding of streams occurs while the job is running.
+
+{{< hint info >}}
+Note: Before consuming data, ensure that the Kinesis Data Stream is created 
with `ACTIVE` status on the Amazon Kinesis Data Streams console.
+{{< /hint >}}
 
-Before consuming data from Kinesis streams, make sure that all streams are 
created with the status "ACTIVE" in the Amazon Kinesis Data Stream console.
+The `KinesisStreamsSource` provides a fluent builder to construct an instance 
of the `KinesisStreamsSource`. 
+The code snippet below illustrates how to do so. 
 
-{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}}
+{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3371" >}}
 {{< tab "Java" >}}
 ```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+// Configure the KinesisStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, 
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, 
by default connector will read from LATEST
+
+// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
+KinesisStreamsSource<String> kdsSource =
+        KinesisStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+                .setSourceConfig(sourceConfig)
+                .setDeserializationSchema(new SimpleStringSchema())
+                
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This 
is optional, by default uniformShardAssigner will be used.
+                .build();
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+// Specify watermarking strategy and the name of the Kinesis Source operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+DataStream<String> kinesisRecordsWithEventTimeWatermarks = 
env.fromSource(kdsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "Kinesis source")
+        .returns(TypeInformation.of(String.class))
+        .uid("custom-uid");
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val consumerConfig = new Properties()
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
-consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
-
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+val sourceConfig = new Configuration()
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, 
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON) // This is optional, 
by default connector will read from LATEST
 
-val kinesis = env.addSource(new FlinkKinesisConsumer[String](
-    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-consumer_config = {
-    'aws.region': 'us-east-1',
-    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
-    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
-    'flink.stream.initpos': 'LATEST'
-}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
-env = StreamExecutionEnvironment.get_execution_environment()
+val kdsSource = KinesisStreamsSource.builder[String]()
+            
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+            .setSourceConfig(sourceConfig)
+            .setDeserializationSchema(new SimpleStringSchema())
+            
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This 
is optional, by default uniformShardAssigner will be used.
+            .build()
 
-kinesis = env.add_source(FlinkKinesisConsumer("stream-1", 
SimpleStringSchema(), consumer_config))
+val kinesisEvents = env.fromSource(kdsSource, 
WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "Kinesis source")
+            .uid("custom-uid")
 ```
 {{< /tab >}}
-{{< /tabs >}}
 
-The above is a simple example of using the consumer. Configuration for the 
consumer is supplied with a `java.util.Properties`
-instance, the configuration keys for which can be found in 
`AWSConfigConstants` (AWS-specific parameters) and 
-`ConsumerConfigConstants` (Kinesis consumer parameters). The example
-demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". 
The AWS credentials are supplied using the basic method in which
-the AWS access key ID and secret access key are directly supplied in the 
configuration. Also, data is being consumed
-from the newest position in the Kinesis stream (the other option will be 
setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
-to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream 
from the earliest record possible).
-
-Other optional configuration keys for the consumer can be found in 
`ConsumerConfigConstants`.
-
-Note that the configured parallelism of the Flink Kinesis Consumer source
-can be completely independent of the total number of shards in the Kinesis 
streams.
-When the number of shards is larger than the parallelism of the consumer,
-then each consumer subtask can subscribe to multiple shards; otherwise
-if the number of shards is smaller than the parallelism of the consumer,
-then some consumer subtasks will simply be idle and wait until it gets assigned
-new shards (i.e., when the streams are resharded to increase the
-number of shards for higher provisioned Kinesis service throughput).
-
-Also note that the default assignment of shards to subtasks is based on the 
hashes of the shard and stream names,
-which will more-or-less balance the shards across the subtasks.
-However, assuming the default Kinesis shard management is used on the stream 
(UpdateShardCount with `UNIFORM_SCALING`),
-setting `UniformShardAssigner` as the shard assigner on the consumer will much 
more evenly distribute shards to subtasks.
-Assuming the incoming Kinesis records are assigned random Kinesis 
`PartitionKey` or `ExplicitHashKey` values,
-the result is consistent subtask loading.
-If neither the default assigner nor the `UniformShardAssigner` suffice, a 
custom implementation of `KinesisShardAssigner` can be set.
-
-### The `DeserializationSchema`
-
-Flink Kinesis Consumer also needs a schema to know how to turn the binary data 
in a Kinesis Data Stream into Java objects.
-The `KinesisDeserializationSchema` allows users to specify such a schema. The 
`T deserialize(byte[] recordValue, String partitionKey, String seqNum, long 
approxArrivalTimestamp, String stream, String shardId)` 
-method gets called for each Kinesis record.
+The above is a simple example of using the `KinesisStreamsSource`.
+- The Kinesis stream being read from is specified using the Kinesis Stream ARN.
+- Configuration for the `Source` is supplied using an instance of Flink's 
`Configuration` class.
+  The configuration keys can be taken from `AWSConfigOptions` (AWS-specific 
configuration) and `KinesisSourceConfigOptions` (Kinesis Source configuration).
+- The example specifies the starting position as `TRIM_HORIZON` (see 
[Configuring Starting Position](#configuring-starting-position) for more 
information).
+- The deserialization format is as `SimpleStringSchema` (see [Deserialization 
Schema](#deserialization-schema) for more information).
+- The distribution of shards across subtasks is controlled using the 
`UniformShardAssigner`  (see [Shard Assignment 
Strategy](#shard-assignment-strategy) for more information).
+- The example also specifies an increasing `WatermarkStrategy`, which means 
each record will be tagged with event time specified using 
`approximateArrivalTimestamp`. 
+  Monotonically increasing watermarks will be generated, and subtasks will be 
considered idle if no record is emitted after 1 second.
 
-For convenience, Flink provides the following schemas out of the box:
-  
-1. `TypeInformationSerializationSchema` which creates a schema based on a 
Flink's `TypeInformation`. 
-    This is useful if the data is both written and read by Flink.
-    This schema is a performant Flink-specific alternative to other generic 
serialization approaches.
-    
-2. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup 
the writer's schema (schema which was used to write the record)
-   in [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). 
Using this, deserialization schema record will be
-   read with the schema retrieved from AWS Glue Schema Registry and 
transformed to either 
`com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema`
-   that represents generic record with a manually provided schema or a JAVA 
POJO generated by 
[mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema).
  
-   
-   <br>To use this deserialization schema one has to add the following 
additional dependency:
-       
-{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
-{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}}
-{{< connector_artifact flink-json-glue-schema-registry kinesis >}}
-{{< /tab >}}
-{{< /tabs >}}
-    
-3. `AvroDeserializationSchema` which reads data serialized with Avro format 
using a statically provided schema. It can
-    infer the schema from Avro generated classes 
(`AvroDeserializationSchema.forSpecific(...)`) or it can work with 
`GenericRecords`
-    with a manually provided schema (with 
`AvroDeserializationSchema.forGeneric(...)`). This deserialization schema 
expects that
-    the serialized records DO NOT contain the embedded schema.
+### Configuring Access to Kinesis with IAM
+Access to Kinesis streams are controlled via IAM identities. Make sure to 
create the appropriate IAM policy to allow reading / writing to / from the 
Kinesis streams. See examples 
[here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
 
-    - You can use [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
-      to retrieve the writer’s schema. Similarly, the deserialization record 
will be read with the schema from AWS Glue Schema Registry and transformed
-      (either through 
`GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or 
`GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`).
-      For more information on integrating the AWS Glue Schema Registry with 
Apache Flink see
-      [Use Case: Amazon Kinesis Data Analytics for Apache 
Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink).
+Depending on your deployment, you can select a suitable AWS Credentials 
Provider.
+By default, the `AUTO` Credentials Provider is used. 
+If the access key ID and secret key are set in the configuration, the `BASIC` 
provider is used.
 
-    <br>To use this deserialization schema one has to add the following 
additional dependency:
-    
-{{< tabs "71c8eb0c-6a78-476f-a52e-8a46d83f2ca4" >}}
-{{< tab "AvroDeserializationSchema" >}}
-{{< artifact flink-avro >}}
-{{< /tab >}}
-{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}}
-{{< connector_artifact flink-avro-glue-schema-registry kinesis >}}
-{{< /tab >}}
-{{< /tabs >}}
+A specific Credentials Provider can **optionally** be set by using the 
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
 
-### Configuring Starting Position
+Supported Credential Providers are:
+* `AUTO` - Using the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
+* `BASIC` - Using access key ID and secret key supplied as configuration.
+* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
+* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
+* `CUSTOM` - Use a custom user class as credential provider.
+* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
 
-The Flink Kinesis Consumer currently provides the following options to 
configure where to start reading Kinesis streams, simply by setting 
`ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
-one of the following values in the provided configuration properties (the 
naming of the options identically follows [the namings used by the AWS Kinesis 
Streams 
service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
+### Configuring Starting Position
+To specify where the `KinesisStreamsSource` starts reading from the Kinesis 
stream, users can set the `KinesisSourceConfigOptions.STREAM_INITIAL_POSITION` 
in configuration.
+The values used follow [the namings used by the AWS Kinesis Data Streams 
service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax):
 
-- `LATEST`: read all shards of all streams starting from the latest record.
-- `TRIM_HORIZON`: read all shards of all streams starting from the earliest 
record possible (data may be trimmed by Kinesis depending on the retention 
settings).
-- `AT_TIMESTAMP`: read all shards of all streams starting from a specified 
timestamp. The timestamp must also be specified in the configuration
-properties by providing a value for 
`ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following 
date pattern :
+- `LATEST`: read all shards of the stream starting from the latest record.
+- `TRIM_HORIZON`: read all shards of the stream starting from the earliest 
record possible (data may be trimmed by Kinesis depending on the retention 
settings).
+- `AT_TIMESTAMP`: read all shards of the stream starting from a specified 
timestamp. The timestamp must also be specified in the configuration
+  properties by providing a value for 
`KinesisSourceConfigOptions.STREAM_INITIAL_TIMESTAMP`, in one of the following 
date pattern :
     - a non-negative double value representing the number of seconds that has 
elapsed since the Unix epoch (for example, `1459799926.480`).
-    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` 
provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`.
-    If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined 
then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
-    (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` 
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without 
given a pattern).
+    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` 
provided by `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT`.
+      If `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT` is not 
defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
+      (for example, timestamp value is `2016-04-04` and pattern is 
`yyyy-MM-dd` given by user or timestamp value is 
`2016-04-04T19:58:46.480-00:00` without given a pattern).
+
 
 ### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 
-With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume 
records from shards in Kinesis streams and
+With Flink's checkpointing enabled, the `KinesisStreamsSource` will consume 
records from shards in Kinesis streams and
 periodically checkpoint each shard's progress. In case of a job failure, Flink 
will restore the streaming program to the
 state of the latest complete checkpoint and re-consume the records from 
Kinesis shards, starting from the progress that
 was stored in the checkpoint.
 
-The interval of drawing checkpoints therefore defines how much the program may 
have to go back at most, in case of a failure.
+Note that when restoring from a snapshot, the configured starting positions 
will be ignored. The `KinesisStreamsSource` 
+will proceed to read from where it left off in the snapshot. If the restored 
snapshot is stale (e.g. the shards saved are 
+now expired and past the retention period of the Kinesis stream), it will read 
the earliest possible event (effectively a `TRIM_HORIZON`)
 
-To use fault tolerant Kinesis Consumers, checkpointing of the topology needs 
to be enabled at the execution environment:
+If users want to restore a Flink job from an existing snapshot but want to 
respect the configured starting position of 
+the stream, users can change the `uid` of the `KinesisStreamsSource` operator 
to effectively restore this operator without state.
 
-{{< tabs "b1399ed7-5855-446d-9684-7a49de9b4c97" >}}
-{{< tab "Java" >}}
-```java
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-```python
-env = StreamExecutionEnvironment.get_execution_environment()
-env.enable_checkpointing(5000) # checkpoint every 5000 msecs
-```
-{{< /tab >}}
-{{< /tabs >}}
 
-Also note that Flink can only restart the topology if enough processing slots 
are available to restart the topology.
-Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
+### Shard Assignment Strategy
+For most use cases, users would prefer a uniform distribution of records 
across parallel subtasks. This prevents data skew if data is evenly distributed 
in the Kinesis Data Stream.
+This is achieved by the `UniformShardAssigner`, which is the default shard 
assignment strategy. Users can implement their own custom strategy by 
implementing the interface for `KinesisShardAssigner`.
 
-### Using Enhanced Fan-Out
+The uniform distribution of shards across parallel subtasks is a tricky 
situation, especially if the stream has been resharded.
+Amazon Kinesis Data streams distributes `partitionId`s evenly across the 
entire `HashKeyRange` of a given stream, and these ranges are evenly 
distributed across all open shards if `UNIFORM_SCALING` is used.
+However, there will be a mixture of Open and Closed shards on the Kinesis Data 
Stream, and the status of each shard can change during a rescaling operation.
 
-[Enhanced Fan-Out 
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the 
maximum 
-number of concurrent consumers per Kinesis stream.
-Without EFO, all concurrent consumers share a single read quota per shard. 
-Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers. 
-Using EFO will [incur additional 
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
- 
-In order to enable EFO two additional configuration parameters are required:
+To ensure a uniform distribution of `partitionId`s across each parallel 
subtask, the `UniformShardAssigner` uses the `HashKeyRange` of each shard to 
decide which parallel subtask will read from the discovered shard.
 
-- `RECORD_PUBLISHER_TYPE`: Determines whether to use `EFO` or `POLLING`. The 
default `RecordPublisher` is `POLLING`.
-- `EFO_CONSUMER_NAME`: A name to identify the consumer. 
-For a given Kinesis data stream, each consumer must have a unique name. 
-However, consumer names do not have to be unique across data streams. 
-Reusing a consumer name will result in existing subscriptions being terminated.
+### Record ordering
+Kinesis maintains the write order of records per `partitionId` within a 
Kinesis stream. The `KinesisStreamsSource` reads
+records in the same order within a given `partitionId`, even through 
resharding operations. It does this by first checking if 
+a given shard's parents (up to 2 shards) have been completely read, before 
proceeding to read from the given shard.
 
-The code snippet below shows a simple example configurating an EFO consumer.
+### Deserialization Schema
+The `KinesisStreamsSource` retrieves binary data from the Kinesis Data Stream, 
and needs a schema to convert it into Java objects. 
+Both Flink's `DeserializationSchema` and the custom 
`KinesisDeserializationSchema` are accepted by the `KinesisStreamsSource`.
+The `KinesisDeserializationSchema` provides additional Kinesis-specific 
metadata per record to allow users to make serialization decisions based off 
the metadata.
 
-{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}}
-{{< tab "Java" >}}
-```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+For convenience, Flink provides the following schemas out of the box:
+1. `SimpleStringSchema` and `JsonSerializationSchema`.
 
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+2. `TypeInformationSerializationSchema` which creates a schema based on a 
Flink's `TypeInformation`.
+   This is useful if the data is both written and read by Flink.
+   This schema is a performant Flink-specific alternative to other generic 
serialization approaches.
 
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+3. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup 
the writer's schema (schema which was used to write the record)
+   in [AWS Glue Schema 
Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). 
Using this, deserialization schema record will be
+   read with the schema retrieved from AWS Glue Schema Registry and 
transformed to either 
`com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema`
+   that represents generic record with a manually provided schema or a JAVA 
POJO generated by 
[mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema).
 
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val consumerConfig = new Properties()
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+   <br>To use this deserialization schema one has to add the following 
additional dependency:
 
-consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
-    ConsumerConfigConstants.RecordPublisherType.EFO.name());
-consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, 
"my-flink-efo-consumer");
+{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
+{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}}
+{{< connector_artifact flink-json-glue-schema-registry kinesis >}}
+{{< /tab >}}
+{{< /tabs >}}
 
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
+4. `AvroDeserializationSchema` which reads data serialized with Avro format 
using a statically provided schema. It can
+   infer the schema from Avro generated classes 
(`AvroDeserializationSchema.forSpecific(...)`) or it can work with 
`GenericRecords`
+   with a manually provided schema (with 
`AvroDeserializationSchema.forGeneric(...)`). This deserialization schema 
expects that

Review Comment:
   I'm not sure - I just maintained this from the previous docs. Added the 
rephrasing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to