wuchong commented on a change in pull request #12805: URL: https://github.com/apache/flink/pull/12805#discussion_r451951070
########## File path: docs/dev/table/connectors/kafka.md ########## @@ -207,6 +214,19 @@ However, it will cause a lot of network connections between all the Flink instan By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing). +With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees. + +Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option: + + * `NONE`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated. + * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be lost (although they can be duplicated). Review comment: ```suggestion * `at-least-once` (default setting): This guarantees that no records will be lost (although they can be duplicated). ``` ########## File path: docs/dev/table/connectors/kafka.md ########## @@ -207,6 +214,19 @@ However, it will cause a lot of network connections between all the Flink instan By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing). +With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees. + +Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option: + + * `NONE`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated. + * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be lost (although they can be duplicated). + * `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write Review comment: ```suggestion * `exactly-once`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write ``` ########## File path: docs/dev/table/connectors/kafka.md ########## @@ -207,6 +214,19 @@ However, it will cause a lot of network connections between all the Flink instan By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing). +With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees. + +Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option: + + * `NONE`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated. Review comment: ```suggestion * `none`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated. ``` ########## File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java ########## @@ -65,18 +69,27 @@ protected KafkaDynamicSinkBase createKafkaTableSink( String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, - EncodingFormat<SerializationSchema<RowData>> encodingFormat) { + EncodingFormat<SerializationSchema<RowData>> encodingFormat, + KafkaSemantic semantic) { return new Kafka010DynamicSink( consumedDataType, topic, properties, partitioner, - encodingFormat); + encodingFormat, + semantic); } @Override public String factoryIdentifier() { return IDENTIFIER; } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = super.optionalOptions(); + options.remove(SINK_SEMANTIC); Review comment: Add a comment on this to explain why we remove sink semantic in 0.10 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org