hlteoh37 commented on code in PR #179: URL: https://github.com/apache/flink-connector-aws/pull/179#discussion_r1832527361
########## docs/content/docs/connectors/datastream/kinesis.md: ########## @@ -27,644 +27,348 @@ under the License. # Amazon Kinesis Data Streams Connector -The Kinesis connector provides access to [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/). - -To use this connector, add one or more of the following dependencies to your project, depending on whether you are reading from and/or writing to Kinesis Data Streams: - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">KDS Connectivity</th> - <th class="text-left">Maven Dependency</th> - </tr> - </thead> - <tbody> - <tr> - <td>Source</td> - <td>{{< connector_artifact flink-connector-kinesis kinesis >}}</td> - </tr> - <tr> - <td>Sink</td> - <td>{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}}</td> - </tr> - </tbody> -</table> +The Kinesis connector allows users to read/write from [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/). -{{< py_connector_download_link "kinesis" >}} +## Dependency -## Using the Amazon Kinesis Streams Service -Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) -to setup Kinesis streams. +To use this connector, add the below dependency to your project: -## Configuring Access to Kinesis with IAM -Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html). +{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}} -Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. -By default, the `AUTO` Credentials Provider is used. -If the access key ID and secret key are set in the configuration, the `BASIC` provider is used. +For use in PyFlink jobs, use the following dependency: -A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting. - -Supported Credential Providers are: -* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider. -* `BASIC` - Using access key ID and secret key supplied as configuration. -* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. -* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey. -* `CUSTOM` - Use a custom user class as credential provider. -* `PROFILE` - Use AWS credentials profile file to create the AWS credentials. -* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. -* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. +{{< py_connector_download_link "kinesis" >}} -## Kinesis Consumer -The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis -streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is -responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will -change as shards are closed and created by Kinesis. +## Kinesis Streams Source +The `KinesisStreamsSource` is an exactly-once, parallel streaming data source based on the [FLIP-27 source interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface). +The source subscribes to a single Amazon Kinesis Data stream, and reads events whilst maintaining order within a specific Kinesis `partitionId`. +The `KinesisStreamsSource` will discover the shards of the stream and start reading from each shard in parallel, depending on the parallelism of the operator. +For more details on selecting the right parallelism, see section on [parallelism](#parallelism-and-number-of-shards). +It also transparently handles discovery of new shards of the Kinesis Data stream if resharding of streams occurs while the job is running. + +{{< hint info >}} +Note: Before consuming data, ensure that the Kinesis Data Stream is created with `ACTIVE` status on the Amazon Kinesis Data Streams console. +{{< /hint >}} -Before consuming data from Kinesis streams, make sure that all streams are created with the status "ACTIVE" in the Amazon Kinesis Data Stream console. +The `KinesisStreamsSource` provides a fluent builder to construct an instance of the `KinesisStreamsSource`. +The code snippet below illustrates how to do so. -{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}} +{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3371" >}} {{< tab "Java" >}} ```java -Properties consumerConfig = new Properties(); -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +// Configure the KinesisStreamsSource +Configuration sourceConfig = new Configuration(); +sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST + +// Create a new KinesisStreamsSource to read from specified Kinesis Stream. +KinesisStreamsSource<String> kdsSource = + KinesisStreamsSource.<String>builder() + .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") + .setSourceConfig(sourceConfig) + .setDeserializationSchema(new SimpleStringSchema()) + .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); +// Specify watermarking strategy and the name of the Kinesis Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. +DataStream<String> kinesisRecordsWithEventTimeWatermarks = env.fromSource(kdsSource, WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "Kinesis source") + .returns(TypeInformation.of(String.class)) + .uid("custom-uid"); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val consumerConfig = new Properties() -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") - -val env = StreamExecutionEnvironment.getExecutionEnvironment +val sourceConfig = new Configuration() +sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON) // This is optional, by default connector will read from LATEST -val kinesis = env.addSource(new FlinkKinesisConsumer[String]( - "kinesis_stream_name", new SimpleStringSchema, consumerConfig)) -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -consumer_config = { - 'aws.region': 'us-east-1', - 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id', - 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key', - 'flink.stream.initpos': 'LATEST' -} +val env = StreamExecutionEnvironment.getExecutionEnvironment() -env = StreamExecutionEnvironment.get_execution_environment() +val kdsSource = KinesisStreamsSource.builder[String]() + .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") + .setSourceConfig(sourceConfig) + .setDeserializationSchema(new SimpleStringSchema()) + .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. + .build() -kinesis = env.add_source(FlinkKinesisConsumer("stream-1", SimpleStringSchema(), consumer_config)) +val kinesisEvents = env.fromSource(kdsSource, WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "Kinesis source") + .uid("custom-uid") ``` {{< /tab >}} -{{< /tabs >}} -The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties` -instance, the configuration keys for which can be found in `AWSConfigConstants` (AWS-specific parameters) and -`ConsumerConfigConstants` (Kinesis consumer parameters). The example -demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which -the AWS access key ID and secret access key are directly supplied in the configuration. Also, data is being consumed -from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` -to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible). - -Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`. - -Note that the configured parallelism of the Flink Kinesis Consumer source -can be completely independent of the total number of shards in the Kinesis streams. -When the number of shards is larger than the parallelism of the consumer, -then each consumer subtask can subscribe to multiple shards; otherwise -if the number of shards is smaller than the parallelism of the consumer, -then some consumer subtasks will simply be idle and wait until it gets assigned -new shards (i.e., when the streams are resharded to increase the -number of shards for higher provisioned Kinesis service throughput). - -Also note that the default assignment of shards to subtasks is based on the hashes of the shard and stream names, -which will more-or-less balance the shards across the subtasks. -However, assuming the default Kinesis shard management is used on the stream (UpdateShardCount with `UNIFORM_SCALING`), -setting `UniformShardAssigner` as the shard assigner on the consumer will much more evenly distribute shards to subtasks. -Assuming the incoming Kinesis records are assigned random Kinesis `PartitionKey` or `ExplicitHashKey` values, -the result is consistent subtask loading. -If neither the default assigner nor the `UniformShardAssigner` suffice, a custom implementation of `KinesisShardAssigner` can be set. - -### The `DeserializationSchema` - -Flink Kinesis Consumer also needs a schema to know how to turn the binary data in a Kinesis Data Stream into Java objects. -The `KinesisDeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)` -method gets called for each Kinesis record. +The above is a simple example of using the `KinesisStreamsSource`. +- The Kinesis stream being read from is specified using the Kinesis Stream ARN. +- Configuration for the `Source` is supplied using an instance of Flink's `Configuration` class. + The configuration keys can be taken from `AWSConfigOptions` (AWS-specific configuration) and `KinesisSourceConfigOptions` (Kinesis Source configuration). +- The example specifies the starting position as `TRIM_HORIZON` (see [Configuring Starting Position](#configuring-starting-position) for more information). +- The deserialization format is as `SimpleStringSchema` (see [Deserialization Schema](#deserialization-schema) for more information). +- The distribution of shards across subtasks is controlled using the `UniformShardAssigner` (see [Shard Assignment Strategy](#shard-assignment-strategy) for more information). +- The example also specifies an increasing `WatermarkStrategy`, which means each record will be tagged with event time specified using `approximateArrivalTimestamp`. + Monotonically increasing watermarks will be generated, and subtasks will be considered idle if no record is emitted after 1 second. -For convenience, Flink provides the following schemas out of the box: - -1. `TypeInformationSerializationSchema` which creates a schema based on a Flink's `TypeInformation`. - This is useful if the data is both written and read by Flink. - This schema is a performant Flink-specific alternative to other generic serialization approaches. - -2. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup the writer's schema (schema which was used to write the record) - in [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Using this, deserialization schema record will be - read with the schema retrieved from AWS Glue Schema Registry and transformed to either `com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema` - that represents generic record with a manually provided schema or a JAVA POJO generated by [mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema). - - <br>To use this deserialization schema one has to add the following additional dependency: - -{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}} -{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}} -{{< connector_artifact flink-json-glue-schema-registry kinesis >}} -{{< /tab >}} -{{< /tabs >}} - -3. `AvroDeserializationSchema` which reads data serialized with Avro format using a statically provided schema. It can - infer the schema from Avro generated classes (`AvroDeserializationSchema.forSpecific(...)`) or it can work with `GenericRecords` - with a manually provided schema (with `AvroDeserializationSchema.forGeneric(...)`). This deserialization schema expects that - the serialized records DO NOT contain the embedded schema. +### Configuring Access to Kinesis with IAM +Access to Kinesis streams are controlled via IAM identities. Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html). - - You can use [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) - to retrieve the writer’s schema. Similarly, the deserialization record will be read with the schema from AWS Glue Schema Registry and transformed - (either through `GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or `GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`). - For more information on integrating the AWS Glue Schema Registry with Apache Flink see - [Use Case: Amazon Kinesis Data Analytics for Apache Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink). +Depending on your deployment, you can select a suitable AWS Credentials Provider. +By default, the `AUTO` Credentials Provider is used. +If the access key ID and secret key are set in the configuration, the `BASIC` provider is used. - <br>To use this deserialization schema one has to add the following additional dependency: - -{{< tabs "71c8eb0c-6a78-476f-a52e-8a46d83f2ca4" >}} -{{< tab "AvroDeserializationSchema" >}} -{{< artifact flink-avro >}} -{{< /tab >}} -{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}} -{{< connector_artifact flink-avro-glue-schema-registry kinesis >}} -{{< /tab >}} -{{< /tabs >}} +A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting. -### Configuring Starting Position +Supported Credential Providers are: +* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider. +* `BASIC` - Using access key ID and secret key supplied as configuration. +* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. +* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey. +* `CUSTOM` - Use a custom user class as credential provider. +* `PROFILE` - Use AWS credentials profile file to create the AWS credentials. +* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. +* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. -The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to -one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)): +### Configuring Starting Position +To specify where the `KinesisStreamsSource` starts reading from the Kinesis stream, users can set the `KinesisSourceConfigOptions.STREAM_INITIAL_POSITION` in configuration. +The values used follow [the namings used by the AWS Kinesis Data Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax): -- `LATEST`: read all shards of all streams starting from the latest record. -- `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). -- `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : +- `LATEST`: read all shards of the stream starting from the latest record. +- `TRIM_HORIZON`: read all shards of the stream starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). +- `AT_TIMESTAMP`: read all shards of the stream starting from a specified timestamp. The timestamp must also be specified in the configuration + properties by providing a value for `KinesisSourceConfigOptions.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). - - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. - If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` - (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT`. + If `KinesisSourceConfigOptions.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` + (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). + ### Fault Tolerance for Exactly-Once User-Defined State Update Semantics -With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and +With Flink's checkpointing enabled, the `KinesisStreamsSource` will consume records from shards in Kinesis streams and periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that was stored in the checkpoint. -The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. +Note that when restoring from a snapshot, the configured starting positions will be ignored. The `KinesisStreamsSource` +will proceed to read from where it left off in the snapshot. If the restored snapshot is stale (e.g. the shards saved are +now expired and past the retention period of the Kinesis stream), it will read the earliest possible event (effectively a `TRIM_HORIZON`) -To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment: +If users want to restore a Flink job from an existing snapshot but want to respect the configured starting position of +the stream, users can change the `uid` of the `KinesisStreamsSource` operator to effectively restore this operator without state. -{{< tabs "b1399ed7-5855-446d-9684-7a49de9b4c97" >}} -{{< tab "Java" >}} -```java -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.enableCheckpointing(5000); // checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() -env.enableCheckpointing(5000) // checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< tab "Python" >}} -```python -env = StreamExecutionEnvironment.get_execution_environment() -env.enable_checkpointing(5000) # checkpoint every 5000 msecs -``` -{{< /tab >}} -{{< /tabs >}} -Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. -Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. -Flink on YARN supports automatic restart of lost YARN containers. +### Shard Assignment Strategy +For most use cases, users would prefer a uniform distribution of records across parallel subtasks. This prevents data skew if data is evenly distributed in the Kinesis Data Stream. +This is achieved by the `UniformShardAssigner`, which is the default shard assignment strategy. Users can implement their own custom strategy by implementing the interface for `KinesisShardAssigner`. -### Using Enhanced Fan-Out +The uniform distribution of shards across parallel subtasks is a tricky situation, especially if the stream has been resharded. +Amazon Kinesis Data streams distributes `partitionId`s evenly across the entire `HashKeyRange` of a given stream, and these ranges are evenly distributed across all open shards if `UNIFORM_SCALING` is used. +However, there will be a mixture of Open and Closed shards on the Kinesis Data Stream, and the status of each shard can change during a rescaling operation. -[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum -number of concurrent consumers per Kinesis stream. -Without EFO, all concurrent consumers share a single read quota per shard. -Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers. -Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/). - -In order to enable EFO two additional configuration parameters are required: +To ensure a uniform distribution of `partitionId`s across each parallel subtask, the `UniformShardAssigner` uses the `HashKeyRange` of each shard to decide which parallel subtask will read from the discovered shard. -- `RECORD_PUBLISHER_TYPE`: Determines whether to use `EFO` or `POLLING`. The default `RecordPublisher` is `POLLING`. -- `EFO_CONSUMER_NAME`: A name to identify the consumer. -For a given Kinesis data stream, each consumer must have a unique name. -However, consumer names do not have to be unique across data streams. -Reusing a consumer name will result in existing subscriptions being terminated. +### Record ordering +Kinesis maintains the write order of records per `partitionId` within a Kinesis stream. The `KinesisStreamsSource` reads +records in the same order within a given `partitionId`, even through resharding operations. It does this by first checking if +a given shard's parents (up to 2 shards) have been completely read, before proceeding to read from the given shard. -The code snippet below shows a simple example configurating an EFO consumer. +### Deserialization Schema +The `KinesisStreamsSource` retrieves binary data from the Kinesis Data Stream, and needs a schema to convert it into Java objects. +Both Flink's `DeserializationSchema` and the custom `KinesisDeserializationSchema` are accepted by the `KinesisStreamsSource`. +The `KinesisDeserializationSchema` provides additional Kinesis-specific metadata per record to allow users to make serialization decisions based off the metadata. -{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}} -{{< tab "Java" >}} -```java -Properties consumerConfig = new Properties(); -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +For convenience, Flink provides the following schemas out of the box: +1. `SimpleStringSchema` and `JsonSerializationSchema`. -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +2. `TypeInformationSerializationSchema` which creates a schema based on a Flink's `TypeInformation`. + This is useful if the data is both written and read by Flink. + This schema is a performant Flink-specific alternative to other generic serialization approaches. -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +3. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup the writer's schema (schema which was used to write the record) + in [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Using this, deserialization schema record will be + read with the schema retrieved from AWS Glue Schema Registry and transformed to either `com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema` + that represents generic record with a manually provided schema or a JAVA POJO generated by [mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema). -DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val consumerConfig = new Properties() -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") + <br>To use this deserialization schema one has to add the following additional dependency: -consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, - ConsumerConfigConstants.RecordPublisherType.EFO.name()); -consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer"); +{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}} +{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}} +{{< connector_artifact flink-json-glue-schema-registry kinesis >}} +{{< /tab >}} +{{< /tabs >}} -val env = StreamExecutionEnvironment.getExecutionEnvironment() +4. `AvroDeserializationSchema` which reads data serialized with Avro format using a statically provided schema. It can + infer the schema from Avro generated classes (`AvroDeserializationSchema.forSpecific(...)`) or it can work with `GenericRecords` + with a manually provided schema (with `AvroDeserializationSchema.forGeneric(...)`). This deserialization schema expects that Review Comment: I'm not sure - I just maintained this from the previous docs. Added the rephrasing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org