dannycranmer commented on a change in pull request #15830: URL: https://github.com/apache/flink/pull/15830#discussion_r631794648
########## File path: docs/content.zh/docs/connectors/datastream/kinesis.md ########## @@ -126,6 +126,45 @@ shard IDs are not consecutive (as result of dynamic re-sharding in Kinesis). For cases where skew in the assignment leads to significant imbalanced consumption, a custom implementation of `KinesisShardAssigner` can be set on the consumer. +### The `DeserializationSchema` + +Like Flink Kafka Consumers, the Flink Kinesis Consumer also needs a schema to know how to turn the binary data in a Kinesis Data Stream into Java objects. Review comment: Do we really need to reference Kafka here? It assumes the reader is familiar with Kafka. ########## File path: docs/content.zh/docs/connectors/datastream/kinesis.md ########## @@ -126,6 +126,45 @@ shard IDs are not consecutive (as result of dynamic re-sharding in Kinesis). For cases where skew in the assignment leads to significant imbalanced consumption, a custom implementation of `KinesisShardAssigner` can be set on the consumer. +### The `DeserializationSchema` + +Like Flink Kafka Consumers, the Flink Kinesis Consumer also needs a schema to know how to turn the binary data in a Kinesis Data Stream into Java objects. +The `KinesisDeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)` +method gets called for each Kinesis record. + +For convenience, Flink provides the following schemas out of the box: + +1. `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) which creates + a schema based on a Flink's `TypeInformation`. This is useful if the data is both written and read by Flink. + This schema is a performant Flink-specific alternative to other generic serialization approaches. + +2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which turns the serialized JSON Review comment: I cannot see a `JsonDeserializationSchema` in the codebase, and `JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema`, not sure this is right for Kinesis. ########## File path: flink-formats/flink-avro-glue-schema-registry/pom.xml ########## @@ -33,7 +33,7 @@ under the License. <packaging>jar</packaging> <properties> - <glue.schema.registry.version>1.0.1</glue.schema.registry.version> + <glue.schema.registry.version>1.0.2</glue.schema.registry.version> Review comment: Really this change (and below) should be in a dedicated PR. Can you please raise a separate PR with a dedicated Jira issue and we should be able to merge this right away. ########## File path: docs/content/docs/connectors/datastream/kafka.md ########## @@ -100,6 +100,12 @@ For convenience, Flink provides the following schemas out of the box: [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/docs/index.html). Using these deserialization schema record will be read with the schema that was retrieved from Schema Registry and transformed to a statically provided( either through `ConfluentRegistryAvroDeserializationSchema.forGeneric(...)` or `ConfluentRegistryAvroDeserializationSchema.forSpecific(...)`). + + - You can also use AWS's implementation of a schema registry [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) Review comment: nit: same as above, reword to `You can use [AWS Glue Schema Registry]` ########## File path: docs/content.zh/docs/connectors/datastream/kinesis.md ########## @@ -126,6 +126,45 @@ shard IDs are not consecutive (as result of dynamic re-sharding in Kinesis). For cases where skew in the assignment leads to significant imbalanced consumption, a custom implementation of `KinesisShardAssigner` can be set on the consumer. +### The `DeserializationSchema` + +Like Flink Kafka Consumers, the Flink Kinesis Consumer also needs a schema to know how to turn the binary data in a Kinesis Data Stream into Java objects. +The `KinesisDeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)` +method gets called for each Kinesis record. + +For convenience, Flink provides the following schemas out of the box: + +1. `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) which creates + a schema based on a Flink's `TypeInformation`. This is useful if the data is both written and read by Flink. + This schema is a performant Flink-specific alternative to other generic serialization approaches. + +2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which turns the serialized JSON + into an ObjectNode object, from which fields can be accessed using `objectNode.get("field").as(Int/String/...)()`. + The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as + an optional "metadata" field that exposes the offset/partition/topic for this message. + +3. `AvroDeserializationSchema` which reads data serialized with Avro format using a statically provided schema. It can + infer the schema from Avro generated classes (`AvroDeserializationSchema.forSpecific(...)`) or it can work with `GenericRecords` + with a manually provided schema (with `AvroDeserializationSchema.forGeneric(...)`). This deserialization schema expects that + the serialized records DO NOT contain embedded schema. + + - You can use AWS's implementation of a schema registry [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) Review comment: nit: reword `You can use AWS's implementation of a schema registry [AWS Glue Schema Registry]` to `You can use [AWS Glue Schema Registry]` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org